Add support for cancelling async value computation

This commit is contained in:
chylex 2022-05-28 21:33:30 +02:00
parent 15e8b9da63
commit b13b85dedd
No known key found for this signature in database
GPG Key ID: 4DE42C8F19A80548
4 changed files with 68 additions and 19 deletions

View File

@ -153,7 +153,7 @@ namespace DHT.Desktop.Main.Controls {
else { else {
exportedMessageCount = null; exportedMessageCount = null;
UpdateFilterStatisticsText(); UpdateFilterStatisticsText();
exportedMessageCountComputer.Compute(_ => db.CountMessages(filter)); exportedMessageCountComputer.Compute(() => db.CountMessages(filter));
} }
} }

View File

@ -2,7 +2,6 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Collections.Immutable; using System.Collections.Immutable;
using System.Text; using System.Text;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DHT.Server.Data; using DHT.Server.Data;
using DHT.Server.Data.Filters; using DHT.Server.Data.Filters;
@ -458,7 +457,7 @@ LEFT JOIN replied_to rt ON m.message_id = rt.message_id" + filter.GenerateWhereC
Statistics.TotalUsers = conn.SelectScalar("SELECT COUNT(*) FROM users") as long? ?? 0; Statistics.TotalUsers = conn.SelectScalar("SELECT COUNT(*) FROM users") as long? ?? 0;
} }
private long ComputeMessageStatistics(CancellationToken token) { private long ComputeMessageStatistics() {
using var conn = pool.Take(); using var conn = pool.Take();
return conn.SelectScalar("SELECT COUNT(*) FROM messages") as long? ?? 0L; return conn.SelectScalar("SELECT COUNT(*) FROM messages") as long? ?? 0L;
} }

View File

@ -11,8 +11,10 @@ namespace DHT.Utils.Tasks {
private readonly object stateLock = new (); private readonly object stateLock = new ();
private CancellationTokenSource? currentCancellationTokenSource; private SoftHardCancellationToken? currentCancellationTokenSource;
private Func<CancellationToken, TValue>? currentComputeFunction; private bool wasHardCancelled = false;
private Func<TValue>? currentComputeFunction;
private bool hasComputeFunctionChanged = false; private bool hasComputeFunctionChanged = false;
private AsyncValueComputer(Action<TValue> resultProcessor, TaskScheduler resultTaskScheduler, bool processOutdatedResults) { private AsyncValueComputer(Action<TValue> resultProcessor, TaskScheduler resultTaskScheduler, bool processOutdatedResults) {
@ -21,12 +23,21 @@ namespace DHT.Utils.Tasks {
this.processOutdatedResults = processOutdatedResults; this.processOutdatedResults = processOutdatedResults;
} }
public void Compute(Func<CancellationToken, TValue> func) { public void Cancel() {
lock (stateLock) { lock (stateLock) {
wasHardCancelled = true;
currentCancellationTokenSource?.RequestHardCancellation();
}
}
public void Compute(Func<TValue> func) {
lock (stateLock) {
wasHardCancelled = false;
if (currentComputeFunction != null) { if (currentComputeFunction != null) {
currentComputeFunction = func; currentComputeFunction = func;
hasComputeFunctionChanged = true; hasComputeFunctionChanged = true;
currentCancellationTokenSource?.Cancel(); currentCancellationTokenSource?.RequestSoftCancellation();
} }
else { else {
EnqueueComputation(func); EnqueueComputation(func);
@ -35,23 +46,22 @@ namespace DHT.Utils.Tasks {
} }
[SuppressMessage("ReSharper", "MethodSupportsCancellation")] [SuppressMessage("ReSharper", "MethodSupportsCancellation")]
private void EnqueueComputation(Func<CancellationToken, TValue> func) { private void EnqueueComputation(Func<TValue> func) {
var cancellationTokenSource = new CancellationTokenSource(); var cancellationTokenSource = new SoftHardCancellationToken();
var cancellationToken = cancellationTokenSource.Token;
currentCancellationTokenSource?.Cancel(); currentCancellationTokenSource?.RequestSoftCancellation();
currentCancellationTokenSource = cancellationTokenSource; currentCancellationTokenSource = cancellationTokenSource;
currentComputeFunction = func; currentComputeFunction = func;
hasComputeFunctionChanged = false; hasComputeFunctionChanged = false;
var task = Task.Run(() => func(cancellationToken)); var task = Task.Run(func);
task.ContinueWith(t => { task.ContinueWith(t => {
if (processOutdatedResults || !cancellationToken.IsCancellationRequested) { if (!cancellationTokenSource.IsCancelled(processOutdatedResults)) {
resultProcessor(t.Result); resultProcessor(t.Result);
} }
}, CancellationToken.None, TaskContinuationOptions.NotOnFaulted, resultTaskScheduler); }, CancellationToken.None, TaskContinuationOptions.NotOnFaulted, resultTaskScheduler);
task.ContinueWith(_ => { task.ContinueWith(_ => {
lock (stateLock) { lock (stateLock) {
cancellationTokenSource.Dispose(); cancellationTokenSource.Dispose();
@ -60,11 +70,12 @@ namespace DHT.Utils.Tasks {
currentCancellationTokenSource = null; currentCancellationTokenSource = null;
} }
if (hasComputeFunctionChanged) { if (hasComputeFunctionChanged && !wasHardCancelled) {
EnqueueComputation(currentComputeFunction); EnqueueComputation(currentComputeFunction);
} }
else { else {
currentComputeFunction = null; currentComputeFunction = null;
hasComputeFunctionChanged = false;
} }
} }
}); });
@ -72,9 +83,9 @@ namespace DHT.Utils.Tasks {
public sealed class Single { public sealed class Single {
private readonly AsyncValueComputer<TValue> baseComputer; private readonly AsyncValueComputer<TValue> baseComputer;
private readonly Func<CancellationToken, TValue> resultComputer; private readonly Func<TValue> resultComputer;
internal Single(AsyncValueComputer<TValue> baseComputer, Func<CancellationToken, TValue> resultComputer) { internal Single(AsyncValueComputer<TValue> baseComputer, Func<TValue> resultComputer) {
this.baseComputer = baseComputer; this.baseComputer = baseComputer;
this.resultComputer = resultComputer; this.resultComputer = resultComputer;
} }
@ -107,7 +118,7 @@ namespace DHT.Utils.Tasks {
return new AsyncValueComputer<TValue>(resultProcessor, resultTaskScheduler, processOutdatedResults); return new AsyncValueComputer<TValue>(resultProcessor, resultTaskScheduler, processOutdatedResults);
} }
public Single BuildWithComputer(Func<CancellationToken, TValue> resultComputer) { public Single BuildWithComputer(Func<TValue> resultComputer) {
return new Single(Build(), resultComputer); return new Single(Build(), resultComputer);
} }
} }

View File

@ -0,0 +1,39 @@
using System;
using System.Threading;
namespace DHT.Utils.Tasks {
/// <summary>
/// Manages a pair of cancellation tokens that follow these rules:
/// <list type="number">
/// <item><description>If the soft token is cancelled, the hard token remains uncancelled.</description></item>
/// <item><description>If the hard token is cancelled, the soft token is also cancelled.</description></item>
/// </list>
/// </summary>
sealed class SoftHardCancellationToken : IDisposable {
private readonly CancellationTokenSource soft;
private readonly CancellationTokenSource hard;
public SoftHardCancellationToken() {
this.soft = new CancellationTokenSource();
this.hard = new CancellationTokenSource();
}
public bool IsCancelled(bool onlyHardCancellation) {
return (onlyHardCancellation ? hard : soft).IsCancellationRequested;
}
public void RequestSoftCancellation() {
soft.Cancel();
}
public void RequestHardCancellation() {
soft.Cancel();
hard.Cancel();
}
public void Dispose() {
soft.Dispose();
hard.Dispose();
}
}
}