Improved the async lazy-loading collection by scheduling simultaneous fetch requests instead of hammering a potential database connection in the background

This commit is contained in:
Markus Ewald 2025-07-29 14:15:10 +02:00
parent d549697692
commit 246c90f7c8
2 changed files with 938 additions and 62 deletions

View file

@ -24,6 +24,8 @@ using System.Diagnostics;
using System.Threading.Tasks;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Collections.Concurrent;
#if !NO_SPECIALIZED_COLLECTIONS
using System.Collections.Specialized;
@ -45,7 +47,8 @@ namespace Nuclex.Avalonia.Collections {
#if !NO_SPECIALIZED_COLLECTIONS
INotifyCollectionChanged,
#endif
IObservableCollection<TItem> {
IObservableCollection<TItem>,
IDisposable {
#region class Enumerator
@ -191,6 +194,17 @@ namespace Nuclex.Avalonia.Collections {
this.objectList = (IList)this.typedList;
this.pageSize = pageSize;
this.fetchedPages = new bool[0];
this.requestedPages = new ConcurrentQueue<int>();
this.cancellationTokenSource = new CancellationTokenSource();
}
/// <summary>Stops all queued fetches and frees the collection's resources</summary>
public void Dispose() {
if(this.cancellationTokenSource != null) {
this.cancellationTokenSource.Cancel();
this.cancellationTokenSource.Dispose();
this.cancellationTokenSource = null!;
}
}
/// <summary>
@ -708,6 +722,8 @@ namespace Nuclex.Avalonia.Collections {
return;
}
itemCount = this.assumedCount.Value;
//if(thi)
}
// If the page is already fetched (or in flight), do nothing
@ -730,70 +746,103 @@ namespace Nuclex.Avalonia.Collections {
Interlocked.MemoryBarrier();
this.fetchedPages[pageIndex] = true; // Prevent double-fetch
// Remember the placeholder items that are going to be replaced by
// the fetch operation below,allowing us to report these in our change notification
var placeholderItems = new List<TItem>(capacity: count);
for(int index = 0; index < count; ++index) {
placeholderItems.Add(this.typedList[index + offset]);
// We act as if the whole list was filled with place holders from the start,
// but only realize these as needed (so no change notification for these).
// If we did it right, the user will not be able to ever see the uninitialized
// items, yet we only create placeholders when they are really needed.
//OnReplaced(default(TItem), this.typedList[index + offset], index);
}
// Now request the items on the page. This request will run asynchronously,
// but we'll use an await to get to deliver change notifications once
// the page has been fetched and the items are there.
int fetchedItemCount;
try {
#if DEBUG
System.Diagnostics.Trace.WriteLine(
$"avor: background-fetching items ${offset} +${count}"
);
#endif
fetchedItemCount = await FetchItemsAsync(this.typedList, offset, count);
#if DEBUG
System.Diagnostics.Trace.WriteLine(
$"avor: finished background-fetching items ${offset} +${count}"
);
#endif
}
catch(Exception error) {
// Do not let the error bubble up into what is likely an unsuspecting
// data grid control or virtualized list widget. Let the user decide what
// to do about the error - i.e. set an error flag in their view model.
HandleFetchError(
error, $"Failed to fetch list items {offset} to {offset + count}"
);
this.fetchedPages[pageIndex] = false; // We failed!
return; // Leave the placeholder items in
}
if(fetchedItemCount < this.pageSize) {
itemCount = offset + fetchedItemCount;
lock(this) {
this.assumedCount = itemCount;
// If another thread is already fetching pages, just schedule this additional page
// instead of kicking off another async fetch which could potentially lead to
// hundreds of async tasks dog-piling on a database connection.
lock(this) {
if(this.isFetching) {
this.requestedPages.TryAdd(pageIndex);
return;
} else {
this.isFetching = true;
}
#if DEBUG
++this.version;
#endif
}
// The count may have been adjusted if this truncated the list,
// so recalculate the actual number of items. Then send out change
// notifications for the items that have now been fetched.
count = Math.Min(itemCount - offset, this.pageSize);
#if DEBUG
System.Diagnostics.Trace.WriteLine(
$"avor: sending 'replace' notifications for items ${offset} +${count}"
);
#endif
for(int index = 0; index < count; ++index) {
OnReplaced(placeholderItems[index], this.typedList[index + offset], index + offset);
}
{
CancellationToken canceller = this.cancellationTokenSource.Token;
// Fetch items from the database. While we do this, other requests may come in
// and will see that one thread is already doing a fetch, then queue their page index
// on the list of requested pages list. This loop will keep running until either
// cancellation occurs or until there are no more requested pages.
while(!canceller.IsCancellationRequested) {
offset = pageIndex * this.pageSize;
count = Math.Min(itemCount - offset, this.pageSize);
// Remember the previous items that are going to be replaced by
// the fetch operation below,allowing us to report these in our change notification
var previousItems = new List<TItem>(capacity: count);
for(int index = 0; index < count; ++index) {
previousItems.Add(this.typedList[index + offset]);
}
// Now request the items on the page. This request will run asynchronously,
// but we'll use an await to get to deliver change notifications once
// the page has been fetched and the items are there.
int fetchedItemCount;
try {
#if DEBUG
System.Diagnostics.Trace.WriteLine(
$"avor: background-fetching items ${offset} +${count}"
);
#endif
fetchedItemCount = await FetchItemsAsync(this.typedList, offset, count);
#if DEBUG
System.Diagnostics.Trace.WriteLine(
$"avor: finished background-fetching items ${offset} +${count}"
);
#endif
}
catch(Exception error) {
// Do not let the error bubble up into what is likely an unsuspecting
// data grid control or virtualized list widget. Let the user decide what
// to do about the error - i.e. set an error flag in their view model.
HandleFetchError(
error, $"Failed to fetch list items {offset} to {offset + count}"
);
this.fetchedPages[pageIndex] = false; // We failed!
lock(this) {
this.isFetching = false;
}
return; // Leave the placeholder items in, do not process the queue further
}
if(fetchedItemCount < this.pageSize) {
itemCount = offset + fetchedItemCount;
lock(this) {
this.assumedCount = itemCount;
}
#if DEBUG
++this.version;
#endif
}
// The count may have been adjusted if this truncated the list,
// so recalculate the actual number of items. Then send out change
// notifications for the items that have now been fetched.
count = Math.Min(itemCount - offset, this.pageSize);
#if DEBUG
System.Diagnostics.Trace.WriteLine(
$"avor: sending 'replace' notifications for items ${offset} +${count}"
);
#endif
for(int index = 0; index < count; ++index) {
OnReplaced(previousItems[index], this.typedList[index + offset], index + offset);
}
// See if there is another page we need to fetch. If there is, continue
// with that page, otherwise, we're done, so we clear the isFetching flag
// to make the next caller that wants items fetch them rather than queue them.
lock(this) {
if(!this.requestedPages.TryTake(out pageIndex)) {
this.isFetching = false;
break;
}
}
} // while not cancellation requested
} // beauty scope
}
/// <summary>Number of items the collection believes it has</summary>
@ -806,6 +855,17 @@ namespace Nuclex.Avalonia.Collections {
private IList<TItem> typedList;
/// <summary>The wrapped list under its object interface</summary>
private IList objectList;
/// <summary>Pages still waiting to be fetched</summary>
private IProducerConsumerCollection<int> requestedPages;
/// <summary>Whether the collection is already fetching pages</summary>
/// <remarks>
/// This is set while a page is being fetched. If it is set, additional
/// requested pages will be queued in the <see cref="requestedPages" />
/// queue to avoid dog-piling on an SQL connection.
/// </remarks>
private bool isFetching; // Could be improved to an int to allow N simultaneous fetches
/// <summary>Allows for cancellation of the whole lazy-loaded collection</summary>
private CancellationTokenSource cancellationTokenSource;
#if DEBUG
/// <summary>Used to detect when enumerators go out of sync</summary>
private int version;