From de8d6a1e119ef8b425c8fd79e50a89bed337384b Mon Sep 17 00:00:00 2001 From: chylex Date: Wed, 17 Apr 2024 13:19:57 +0200 Subject: [PATCH] Stream downloaded files during database merges --- app/Server/Data/DownloadWithData.cs | 3 --- app/Server/Database/DatabaseExtensions.cs | 6 ++--- .../Repositories/IDownloadRepository.cs | 9 +++---- .../Repositories/SqliteDownloadRepository.cs | 25 +++++++++++++------ 4 files changed, 25 insertions(+), 18 deletions(-) delete mode 100644 app/Server/Data/DownloadWithData.cs diff --git a/app/Server/Data/DownloadWithData.cs b/app/Server/Data/DownloadWithData.cs deleted file mode 100644 index 041e7ea..0000000 --- a/app/Server/Data/DownloadWithData.cs +++ /dev/null @@ -1,3 +0,0 @@ -namespace DHT.Server.Data; - -public readonly record struct DownloadWithData(Download Download, byte[]? Data); diff --git a/app/Server/Database/DatabaseExtensions.cs b/app/Server/Database/DatabaseExtensions.cs index f78044d..e93b315 100644 --- a/app/Server/Database/DatabaseExtensions.cs +++ b/app/Server/Database/DatabaseExtensions.cs @@ -1,5 +1,4 @@ using System.Collections.Generic; -using System.IO; using System.Linq; using System.Threading.Tasks; using DHT.Server.Data; @@ -27,8 +26,9 @@ public static class DatabaseExtensions { await target.Messages.Add(batchedMessages); await foreach (var download in source.Downloads.Get()) { - var downloadWithData = await source.Downloads.HydrateWithData(download); - await target.Downloads.AddDownload(downloadWithData.Download, downloadWithData.Data is {} data ? new MemoryStream(data) : null); + if (download.Status != DownloadStatus.Success || !await source.Downloads.GetDownloadData(download.NormalizedUrl, stream => target.Downloads.AddDownload(download, stream))) { + await target.Downloads.AddDownload(download, stream: null); + } } } } diff --git a/app/Server/Database/Repositories/IDownloadRepository.cs b/app/Server/Database/Repositories/IDownloadRepository.cs index 5b61724..fa697ed 100644 --- a/app/Server/Database/Repositories/IDownloadRepository.cs +++ b/app/Server/Database/Repositories/IDownloadRepository.cs @@ -5,7 +5,6 @@ using System.Linq; using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; -using DHT.Server.Data; using DHT.Server.Data.Aggregations; using DHT.Server.Data.Filters; using DHT.Server.Download; @@ -23,8 +22,8 @@ public interface IDownloadRepository { IAsyncEnumerable Get(); - Task HydrateWithData(Data.Download download); - + Task GetDownloadData(string normalizedUrl, Func dataProcessor); + Task GetSuccessfulDownloadWithData(string normalizedUrl, Func dataProcessor); IAsyncEnumerable PullPendingDownloadItems(int count, DownloadItemFilter filter, CancellationToken cancellationToken = default); @@ -52,8 +51,8 @@ public interface IDownloadRepository { return AsyncEnumerable.Empty(); } - public Task HydrateWithData(Data.Download download) { - return Task.FromResult(new DownloadWithData(download, Data: null)); + public Task GetDownloadData(string normalizedUrl, Func dataProcessor) { + return Task.FromResult(false); } public Task GetSuccessfulDownloadWithData(string normalizedUrl, Func dataProcessor) { diff --git a/app/Server/Database/Sqlite/Repositories/SqliteDownloadRepository.cs b/app/Server/Database/Sqlite/Repositories/SqliteDownloadRepository.cs index 35f8c21..2565320 100644 --- a/app/Server/Database/Sqlite/Repositories/SqliteDownloadRepository.cs +++ b/app/Server/Database/Sqlite/Repositories/SqliteDownloadRepository.cs @@ -193,16 +193,27 @@ sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepositor } } - public async Task HydrateWithData(Data.Download download) { + public async Task GetDownloadData(string normalizedUrl, Func dataProcessor) { await using var conn = await pool.Take(); - await using var cmd = conn.Command("SELECT blob FROM download_blobs WHERE normalized_url = :url"); - cmd.AddAndSet(":url", SqliteType.Text, download.NormalizedUrl); - - await using var reader = await cmd.ExecuteReaderAsync(); - var data = await reader.ReadAsync() && !reader.IsDBNull(0) ? (byte[]) reader["blob"] : null; + await using var cmd = conn.Command("SELECT rowid FROM download_blobs WHERE normalized_url = :normalized_url"); + cmd.AddAndSet(":normalized_url", SqliteType.Text, normalizedUrl); - return new DownloadWithData(download, data); + long rowid; + + await using (var reader = await cmd.ExecuteReaderAsync()) { + if (!await reader.ReadAsync()) { + return false; + } + + rowid = reader.GetInt64(0); + } + + await using (var blob = new SqliteBlob(conn.InnerConnection, "download_blobs", "blob", rowid, readOnly: true)) { + await dataProcessor(blob); + } + + return true; } public async Task GetSuccessfulDownloadWithData(string normalizedUrl, Func dataProcessor) {