Stream downloaded files during database merges

This commit is contained in:
chylex 2024-04-17 13:19:57 +02:00
parent d79e6f53b4
commit de8d6a1e11
No known key found for this signature in database
GPG Key ID: 4DE42C8F19A80548
4 changed files with 25 additions and 18 deletions

View File

@ -1,3 +0,0 @@
namespace DHT.Server.Data;
public readonly record struct DownloadWithData(Download Download, byte[]? Data);

View File

@ -1,5 +1,4 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using DHT.Server.Data;
@ -27,8 +26,9 @@ public static class DatabaseExtensions {
await target.Messages.Add(batchedMessages);
await foreach (var download in source.Downloads.Get()) {
var downloadWithData = await source.Downloads.HydrateWithData(download);
await target.Downloads.AddDownload(downloadWithData.Download, downloadWithData.Data is {} data ? new MemoryStream(data) : null);
if (download.Status != DownloadStatus.Success || !await source.Downloads.GetDownloadData(download.NormalizedUrl, stream => target.Downloads.AddDownload(download, stream))) {
await target.Downloads.AddDownload(download, stream: null);
}
}
}
}

View File

@ -5,7 +5,6 @@ using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using DHT.Server.Data;
using DHT.Server.Data.Aggregations;
using DHT.Server.Data.Filters;
using DHT.Server.Download;
@ -23,8 +22,8 @@ public interface IDownloadRepository {
IAsyncEnumerable<Data.Download> Get();
Task<DownloadWithData> HydrateWithData(Data.Download download);
Task<bool> GetDownloadData(string normalizedUrl, Func<Stream, Task> dataProcessor);
Task<bool> GetSuccessfulDownloadWithData(string normalizedUrl, Func<Data.Download, Stream, Task> dataProcessor);
IAsyncEnumerable<DownloadItem> PullPendingDownloadItems(int count, DownloadItemFilter filter, CancellationToken cancellationToken = default);
@ -52,8 +51,8 @@ public interface IDownloadRepository {
return AsyncEnumerable.Empty<Data.Download>();
}
public Task<DownloadWithData> HydrateWithData(Data.Download download) {
return Task.FromResult(new DownloadWithData(download, Data: null));
public Task<bool> GetDownloadData(string normalizedUrl, Func<Stream, Task> dataProcessor) {
return Task.FromResult(false);
}
public Task<bool> GetSuccessfulDownloadWithData(string normalizedUrl, Func<Data.Download, Stream, Task> dataProcessor) {

View File

@ -193,16 +193,27 @@ sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepositor
}
}
public async Task<DownloadWithData> HydrateWithData(Data.Download download) {
public async Task<bool> GetDownloadData(string normalizedUrl, Func<Stream, Task> dataProcessor) {
await using var conn = await pool.Take();
await using var cmd = conn.Command("SELECT blob FROM download_blobs WHERE normalized_url = :url");
cmd.AddAndSet(":url", SqliteType.Text, download.NormalizedUrl);
await using var reader = await cmd.ExecuteReaderAsync();
var data = await reader.ReadAsync() && !reader.IsDBNull(0) ? (byte[]) reader["blob"] : null;
await using var cmd = conn.Command("SELECT rowid FROM download_blobs WHERE normalized_url = :normalized_url");
cmd.AddAndSet(":normalized_url", SqliteType.Text, normalizedUrl);
return new DownloadWithData(download, data);
long rowid;
await using (var reader = await cmd.ExecuteReaderAsync()) {
if (!await reader.ReadAsync()) {
return false;
}
rowid = reader.GetInt64(0);
}
await using (var blob = new SqliteBlob(conn.InnerConnection, "download_blobs", "blob", rowid, readOnly: true)) {
await dataProcessor(blob);
}
return true;
}
public async Task<bool> GetSuccessfulDownloadWithData(string normalizedUrl, Func<Data.Download, Stream, Task> dataProcessor) {