Stream downloaded files directly into database

This commit is contained in:
chylex 2024-04-17 11:00:18 +02:00
parent c8d8d95daa
commit 70c04fc986
No known key found for this signature in database
GPG Key ID: 4DE42C8F19A80548
6 changed files with 51 additions and 30 deletions

View File

@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using DHT.Server.Data;
@ -26,7 +27,8 @@ public static class DatabaseExtensions {
await target.Messages.Add(batchedMessages);
await foreach (var download in source.Downloads.Get()) {
await target.Downloads.AddDownload(await source.Downloads.HydrateWithData(download));
var downloadWithData = await source.Downloads.HydrateWithData(download);
await target.Downloads.AddDownload(downloadWithData.Download, downloadWithData.Data is {} data ? new MemoryStream(data) : null);
}
}
}

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
@ -14,7 +15,7 @@ namespace DHT.Server.Database.Repositories;
public interface IDownloadRepository {
IObservable<long> TotalCount { get; }
Task AddDownload(DownloadWithData item);
Task AddDownload(Data.Download item, Stream? stream);
Task<long> Count(DownloadItemFilter filter, CancellationToken cancellationToken = default);
@ -35,7 +36,7 @@ public interface IDownloadRepository {
internal sealed class Dummy : IDownloadRepository {
public IObservable<long> TotalCount { get; } = Observable.Return(0L);
public Task AddDownload(DownloadWithData item) {
public Task AddDownload(Data.Download item, Stream? stream) {
return Task.CompletedTask;
}

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@ -66,9 +67,7 @@ sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepositor
}
}
public async Task AddDownload(DownloadWithData item) {
var (download, data) = item;
public async Task AddDownload(Data.Download item, Stream? stream) {
await using (var conn = await pool.Take()) {
await conn.BeginTransactionAsync();
@ -80,27 +79,34 @@ sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepositor
("size", SqliteType.Integer),
]);
metadataCmd.Set(":normalized_url", download.NormalizedUrl);
metadataCmd.Set(":download_url", download.DownloadUrl);
metadataCmd.Set(":status", (int) download.Status);
metadataCmd.Set(":type", download.Type);
metadataCmd.Set(":size", download.Size);
metadataCmd.Set(":normalized_url", item.NormalizedUrl);
metadataCmd.Set(":download_url", item.DownloadUrl);
metadataCmd.Set(":status", (int) item.Status);
metadataCmd.Set(":type", item.Type);
metadataCmd.Set(":size", item.Size);
await metadataCmd.ExecuteNonQueryAsync();
if (data == null) {
if (stream == null) {
await using var deleteBlobCmd = conn.Command("DELETE FROM download_blobs WHERE normalized_url = :normalized_url");
deleteBlobCmd.AddAndSet(":normalized_url", SqliteType.Text, download.NormalizedUrl);
deleteBlobCmd.AddAndSet(":normalized_url", SqliteType.Text, item.NormalizedUrl);
await deleteBlobCmd.ExecuteNonQueryAsync();
}
else {
await using var upsertBlobCmd = conn.Upsert("download_blobs", [
("normalized_url", SqliteType.Text),
("blob", SqliteType.Blob)
]);
await using var upsertBlobCmd = conn.Command(
"""
INSERT INTO download_blobs (normalized_url, blob)
VALUES (:normalized_url, ZEROBLOB(:blob_length))
ON CONFLICT (normalized_url) DO UPDATE SET blob = excluded.blob
RETURNING rowid
"""
);
upsertBlobCmd.AddAndSet(":normalized_url", SqliteType.Text, item.NormalizedUrl);
upsertBlobCmd.AddAndSet(":blob_length", SqliteType.Integer, item.Size);
long rowid = await upsertBlobCmd.ExecuteLongScalarAsync();
upsertBlobCmd.Set(":normalized_url", download.NormalizedUrl);
upsertBlobCmd.Set(":blob", data);
await upsertBlobCmd.ExecuteNonQueryAsync();
await using var blob = new SqliteBlob(conn.InnerConnection, "download_blobs", "blob", rowid);
await stream.CopyToAsync(blob);
}
await conn.CommitTransactionAsync();

View File

@ -25,6 +25,10 @@ static class SqliteExtensions {
return await reader.ReadAsync(cancellationToken) ? readFunction(reader) : readFunction(null);
}
public static async Task<long> ExecuteLongScalarAsync(this SqliteCommand command) {
return (long) (await command.ExecuteScalarAsync())!;
}
public static SqliteCommand Insert(this ISqliteConnection conn, string tableName, (string Name, SqliteType Type)[] columns) {
string columnNames = string.Join(',', columns.Select(static c => c.Name));

View File

@ -10,13 +10,12 @@ public readonly struct DownloadItem {
public string? Type { get; init; }
public ulong? Size { get; init; }
internal DownloadWithData ToSuccess(byte[] data) {
var size = (ulong) Math.Max(data.LongLength, 0);
return new DownloadWithData(new Data.Download(NormalizedUrl, DownloadUrl, DownloadStatus.Success, Type, size), data);
internal Data.Download ToSuccess(long size) {
return new Data.Download(NormalizedUrl, DownloadUrl, DownloadStatus.Success, Type, (ulong) Math.Max(size, 0));
}
internal DownloadWithData ToFailure(HttpStatusCode? statusCode = null) {
internal Data.Download ToFailure(HttpStatusCode? statusCode = null) {
var status = statusCode.HasValue ? (DownloadStatus) (int) statusCode : DownloadStatus.GenericError;
return new DownloadWithData(new Data.Download(NormalizedUrl, DownloadUrl, status, Type, Size), Data: null);
return new Data.Download(NormalizedUrl, DownloadUrl, status, Type, Size);
}
}

View File

@ -79,18 +79,27 @@ sealed class DownloaderTask : IAsyncDisposable {
log.Debug("Downloading " + item.DownloadUrl + "...");
try {
var downloadedBytes = await client.GetByteArrayAsync(item.DownloadUrl, cancellationToken);
await db.Downloads.AddDownload(item.ToSuccess(downloadedBytes));
var response = await client.SendAsync(new HttpRequestMessage(HttpMethod.Get, item.DownloadUrl), HttpCompletionOption.ResponseHeadersRead, cancellationToken);
response.EnsureSuccessStatusCode();
if (response.Content.Headers.ContentLength is {} contentLength) {
await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
await db.Downloads.AddDownload(item.ToSuccess(contentLength), stream);
}
else {
await db.Downloads.AddDownload(item.ToFailure(), stream: null);
log.Error("Download response has no content length: " + item.DownloadUrl);
}
} catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) {
// Ignore.
} catch (TaskCanceledException e) when (e.InnerException is TimeoutException) {
await db.Downloads.AddDownload(item.ToFailure());
await db.Downloads.AddDownload(item.ToFailure(), stream: null);
log.Error("Download timed out: " + item.DownloadUrl);
} catch (HttpRequestException e) {
await db.Downloads.AddDownload(item.ToFailure(e.StatusCode));
await db.Downloads.AddDownload(item.ToFailure(e.StatusCode), stream: null);
log.Error(e);
} catch (Exception e) {
await db.Downloads.AddDownload(item.ToFailure());
await db.Downloads.AddDownload(item.ToFailure(), stream: null);
log.Error(e);
} finally {
try {