Write downloads to temporary files to reduce blocking concurrent database accesses

References #168
This commit is contained in:
chylex 2024-10-22 11:28:09 +02:00
parent 5b7312109b
commit 95361d2e01
No known key found for this signature in database
GPG Key ID: 4DE42C8F19A80548

View File

@ -1,4 +1,5 @@
using System;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Reactive.Subjects;
@ -70,10 +71,15 @@ sealed class DownloaderTask : IAsyncDisposable {
var client = new HttpClient(new SocketsHttpHandler {
ConnectTimeout = TimeSpan.FromSeconds(30)
});
client.Timeout = Timeout.InfiniteTimeSpan;
client.DefaultRequestHeaders.UserAgent.ParseAdd(UserAgent);
string tempFileName = Path.GetTempFileName();
log.Debug("Using temporary file: " + tempFileName);
await using var tempFileStream = new FileStream(tempFileName, FileMode.Create, FileAccess.ReadWrite, FileShare.Read, 4096, FileOptions.DeleteOnClose);
while (!cancellationToken.IsCancellationRequested) {
var item = await downloadQueue.Reader.ReadAsync(cancellationToken);
log.Debug("Downloading " + item.DownloadUrl + "...");
@ -81,15 +87,7 @@ sealed class DownloaderTask : IAsyncDisposable {
try {
var response = await client.SendAsync(new HttpRequestMessage(HttpMethod.Get, item.DownloadUrl), HttpCompletionOption.ResponseHeadersRead, cancellationToken);
response.EnsureSuccessStatusCode();
if (response.Content.Headers.ContentLength is {} contentLength) {
await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
await db.Downloads.AddDownload(item.ToSuccess(contentLength), stream);
}
else {
await db.Downloads.AddDownload(item.ToFailure(), stream: null);
log.Error("Download response has no content length: " + item.DownloadUrl);
}
await HandleResponse(response, tempFileStream, item);
} catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) {
// Ignore.
} catch (TaskCanceledException e) when (e.InnerException is TimeoutException) {
@ -111,6 +109,27 @@ sealed class DownloaderTask : IAsyncDisposable {
}
}
private async Task HandleResponse(HttpResponseMessage response, FileStream tempFileStream, DownloadItem item) {
if (response.Content.Headers.ContentLength is not {} contentLength) {
throw new InvalidOperationException("Download response has no content length: " + item.DownloadUrl);
}
try {
if (tempFileStream.Length != 0) {
throw new InvalidOperationException("Temporary file is not empty: " + tempFileStream.Name);
}
await using (var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken)) {
await responseStream.CopyToAsync(tempFileStream, cancellationToken);
}
tempFileStream.Seek(0, SeekOrigin.Begin);
await db.Downloads.AddDownload(item.ToSuccess(contentLength), tempFileStream);
} finally {
tempFileStream.SetLength(0);
}
}
public async ValueTask DisposeAsync() {
try {
await cancellationTokenSource.CancelAsync();