using System; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; namespace DHT.Utils.Tasks; public abstract class ThrottledTaskBase : IDisposable { private readonly Channel> taskChannel = Channel.CreateBounded>(new BoundedChannelOptions(capacity: 1) { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false, FullMode = BoundedChannelFullMode.DropOldest }); private readonly CancellationTokenSource cancellationTokenSource = new (); internal ThrottledTaskBase() {} protected async Task ReaderTask() { var cancellationToken = cancellationTokenSource.Token; try { await foreach (var item in taskChannel.Reader.ReadAllAsync(cancellationToken)) { try { await Run(item, cancellationToken); } catch (OperationCanceledException) { throw; } catch (Exception) { // Ignore. } } } catch (OperationCanceledException) { // Ignore. } finally { cancellationTokenSource.Dispose(); } } protected abstract Task Run(Func func, CancellationToken cancellationToken); public void Post(Func resultComputer) { taskChannel.Writer.TryWrite(resultComputer); } public void Dispose() { taskChannel.Writer.Complete(); cancellationTokenSource.Cancel(); } } public sealed class ThrottledTask : ThrottledTaskBase { private readonly Action resultProcessor; private readonly TaskScheduler resultScheduler; public ThrottledTask(Action resultProcessor, TaskScheduler resultScheduler) { this.resultProcessor = resultProcessor; this.resultScheduler = resultScheduler; Task.Run(ReaderTask); } protected override async Task Run(Func func, CancellationToken cancellationToken) { await func(cancellationToken); await Task.Factory.StartNew(resultProcessor, cancellationToken, TaskCreationOptions.None, resultScheduler); } } public sealed class ThrottledTask : ThrottledTaskBase> { private readonly Action resultProcessor; private readonly TaskScheduler resultScheduler; public ThrottledTask(Action resultProcessor, TaskScheduler resultScheduler) { this.resultProcessor = resultProcessor; this.resultScheduler = resultScheduler; Task.Run(ReaderTask); } protected override async Task Run(Func> func, CancellationToken cancellationToken) { T result = await func(cancellationToken); await Task.Factory.StartNew(() => resultProcessor(result), cancellationToken, TaskCreationOptions.None, resultScheduler); } }