mirror of
https://github.com/chylex/Discord-History-Tracker.git
synced 2025-06-20 16:54:16 +03:00
Make database connection pool asynchronous
This commit is contained in:
parent
d5720c8758
commit
9904a711f7
@ -92,7 +92,7 @@ sealed class DatabasePageModel {
|
||||
await target.AddFrom(db);
|
||||
return true;
|
||||
} finally {
|
||||
db.Dispose();
|
||||
await db.DisposeAsync();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -23,5 +23,7 @@ sealed class DummyDatabaseFile : IDatabaseFile {
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public void Dispose() {}
|
||||
public ValueTask DisposeAsync() {
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
@ -4,7 +4,7 @@ using DHT.Server.Database.Repositories;
|
||||
|
||||
namespace DHT.Server.Database;
|
||||
|
||||
public interface IDatabaseFile : IDisposable {
|
||||
public interface IDatabaseFile : IAsyncDisposable {
|
||||
string Path { get; }
|
||||
|
||||
IUserRepository Users { get; }
|
||||
|
@ -22,7 +22,7 @@ sealed class SqliteAttachmentRepository : BaseSqliteRepository, IAttachmentRepos
|
||||
}
|
||||
|
||||
public async Task<long> Count(AttachmentFilter? filter, CancellationToken cancellationToken) {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
return await conn.ExecuteReaderAsync("SELECT COUNT(DISTINCT normalized_url) FROM attachments a" + filter.GenerateWhereClause("a"), static reader => reader?.GetInt64(0) ?? 0L, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ sealed class SqliteChannelRepository : BaseSqliteRepository, IChannelRepository
|
||||
}
|
||||
|
||||
public async Task Add(IReadOnlyList<Channel> channels) {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
|
||||
await using (var tx = await conn.BeginTransactionAsync()) {
|
||||
await using var cmd = conn.Upsert("channels", [
|
||||
@ -47,12 +47,12 @@ sealed class SqliteChannelRepository : BaseSqliteRepository, IChannelRepository
|
||||
}
|
||||
|
||||
public override async Task<long> Count(CancellationToken cancellationToken) {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
return await conn.ExecuteReaderAsync("SELECT COUNT(*) FROM channels", static reader => reader?.GetInt64(0) ?? 0L, cancellationToken);
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<Channel> Get() {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
|
||||
await using var cmd = conn.Command("SELECT id, server, name, parent_id, position, topic, nsfw FROM channels");
|
||||
await using var reader = await cmd.ExecuteReaderAsync();
|
||||
|
@ -21,7 +21,7 @@ sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepositor
|
||||
}
|
||||
|
||||
public async Task AddDownload(Data.Download download) {
|
||||
using (var conn = pool.Take()) {
|
||||
await using (var conn = await pool.Take()) {
|
||||
await using var cmd = conn.Upsert("downloads", [
|
||||
("normalized_url", SqliteType.Text),
|
||||
("download_url", SqliteType.Text),
|
||||
@ -42,7 +42,7 @@ sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepositor
|
||||
}
|
||||
|
||||
public override async Task<long> Count(CancellationToken cancellationToken) {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
return await conn.ExecuteReaderAsync("SELECT COUNT(*) FROM downloads", static reader => reader?.GetInt64(0) ?? 0L, cancellationToken);
|
||||
}
|
||||
|
||||
@ -97,14 +97,14 @@ sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepositor
|
||||
|
||||
var result = new DownloadStatusStatistics();
|
||||
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
await LoadUndownloadedStatistics(conn, result, cancellationToken);
|
||||
await LoadSuccessStatistics(conn, result, cancellationToken);
|
||||
return result;
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<Data.Download> GetWithoutData() {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
|
||||
await using var cmd = conn.Command("SELECT normalized_url, download_url, status, size FROM downloads");
|
||||
await using var reader = await cmd.ExecuteReaderAsync();
|
||||
@ -120,7 +120,7 @@ sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepositor
|
||||
}
|
||||
|
||||
public async Task<Data.Download> HydrateWithData(Data.Download download) {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
|
||||
await using var cmd = conn.Command("SELECT blob FROM downloads WHERE normalized_url = :url");
|
||||
cmd.AddAndSet(":url", SqliteType.Text, download.NormalizedUrl);
|
||||
@ -136,7 +136,7 @@ sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepositor
|
||||
}
|
||||
|
||||
public async Task<DownloadedAttachment?> GetDownloadedAttachment(string normalizedUrl) {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
|
||||
await using var cmd = conn.Command(
|
||||
"""
|
||||
@ -162,7 +162,7 @@ sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepositor
|
||||
}
|
||||
|
||||
public async Task<int> EnqueueDownloadItems(AttachmentFilter? filter, CancellationToken cancellationToken) {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
|
||||
await using var cmd = conn.Command(
|
||||
$"""
|
||||
@ -181,7 +181,7 @@ sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepositor
|
||||
public async IAsyncEnumerable<DownloadItem> PullEnqueuedDownloadItems(int count, [EnumeratorCancellation] CancellationToken cancellationToken) {
|
||||
var found = new List<DownloadItem>();
|
||||
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
|
||||
await using (var cmd = conn.Command("SELECT normalized_url, download_url, size FROM downloads WHERE status = :enqueued LIMIT :limit")) {
|
||||
cmd.AddAndSet(":enqueued", SqliteType.Integer, (int) DownloadStatus.Enqueued);
|
||||
@ -215,7 +215,7 @@ sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepositor
|
||||
}
|
||||
|
||||
public async Task RemoveDownloadItems(DownloadItemFilter? filter, FilterRemovalMode mode) {
|
||||
using (var conn = pool.Take()) {
|
||||
await using (var conn = await pool.Take()) {
|
||||
await conn.ExecuteAsync(
|
||||
$"""
|
||||
-- noinspection SqlWithoutWhere
|
||||
|
@ -36,7 +36,7 @@ sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository
|
||||
|
||||
bool addedAttachments = false;
|
||||
|
||||
using (var conn = pool.Take()) {
|
||||
await using (var conn = await pool.Take()) {
|
||||
await using var tx = await conn.BeginTransactionAsync();
|
||||
|
||||
await using var messageCmd = conn.Upsert("messages", [
|
||||
@ -170,7 +170,7 @@ sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository
|
||||
}
|
||||
|
||||
public async Task<long> Count(MessageFilter? filter, CancellationToken cancellationToken) {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
return await conn.ExecuteReaderAsync("SELECT COUNT(*) FROM messages" + filter.GenerateWhereClause(), static reader => reader?.GetInt64(0) ?? 0L, cancellationToken);
|
||||
}
|
||||
|
||||
@ -205,7 +205,7 @@ sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<Message> Get(MessageFilter? filter) {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
|
||||
const string AttachmentSql =
|
||||
"""
|
||||
@ -281,7 +281,7 @@ sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<ulong> GetIds(MessageFilter? filter) {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
|
||||
await using var cmd = conn.Command("SELECT message_id FROM messages" + filter.GenerateWhereClause());
|
||||
await using var reader = await cmd.ExecuteReaderAsync();
|
||||
@ -292,7 +292,7 @@ sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository
|
||||
}
|
||||
|
||||
public async Task Remove(MessageFilter filter, FilterRemovalMode mode) {
|
||||
using (var conn = pool.Take()) {
|
||||
await using (var conn = await pool.Take()) {
|
||||
await conn.ExecuteAsync(
|
||||
$"""
|
||||
-- noinspection SqlWithoutWhere
|
||||
|
@ -16,7 +16,7 @@ sealed class SqliteServerRepository : BaseSqliteRepository, IServerRepository {
|
||||
}
|
||||
|
||||
public async Task Add(IReadOnlyList<Data.Server> servers) {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
|
||||
await using (var tx = await conn.BeginTransactionAsync()) {
|
||||
await using var cmd = conn.Upsert("servers", [
|
||||
@ -39,12 +39,12 @@ sealed class SqliteServerRepository : BaseSqliteRepository, IServerRepository {
|
||||
}
|
||||
|
||||
public override async Task<long> Count(CancellationToken cancellationToken) {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
return await conn.ExecuteReaderAsync("SELECT COUNT(*) FROM servers", static reader => reader?.GetInt64(0) ?? 0L, cancellationToken);
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<Data.Server> Get() {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
|
||||
await using var cmd = conn.Command("SELECT id, name, type FROM servers");
|
||||
await using var reader = await cmd.ExecuteReaderAsync();
|
||||
|
@ -16,7 +16,7 @@ sealed class SqliteUserRepository : BaseSqliteRepository, IUserRepository {
|
||||
}
|
||||
|
||||
public async Task Add(IReadOnlyList<User> users) {
|
||||
using (var conn = pool.Take()) {
|
||||
await using (var conn = await pool.Take()) {
|
||||
await using var tx = await conn.BeginTransactionAsync();
|
||||
|
||||
await using var cmd = conn.Upsert("users", [
|
||||
@ -41,12 +41,12 @@ sealed class SqliteUserRepository : BaseSqliteRepository, IUserRepository {
|
||||
}
|
||||
|
||||
public override async Task<long> Count(CancellationToken cancellationToken) {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
return await conn.ExecuteReaderAsync("SELECT COUNT(*) FROM users", static reader => reader?.GetInt64(0) ?? 0L, cancellationToken);
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<User> Get() {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
|
||||
await using var cmd = conn.Command("SELECT id, name, avatar_url, discriminator FROM users");
|
||||
await using var reader = await cmd.ExecuteReaderAsync();
|
||||
|
@ -16,14 +16,14 @@ public sealed class SqliteDatabaseFile : IDatabaseFile {
|
||||
Mode = SqliteOpenMode.ReadWriteCreate,
|
||||
};
|
||||
|
||||
var pool = new SqliteConnectionPool(connectionString, DefaultPoolSize);
|
||||
var pool = await SqliteConnectionPool.Create(connectionString, DefaultPoolSize);
|
||||
bool wasOpened;
|
||||
|
||||
try {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
wasOpened = await new Schema(conn).Setup(schemaUpgradeCallbacks);
|
||||
} catch (Exception) {
|
||||
pool.Dispose();
|
||||
await pool.DisposeAsync();
|
||||
throw;
|
||||
}
|
||||
|
||||
@ -31,7 +31,7 @@ public sealed class SqliteDatabaseFile : IDatabaseFile {
|
||||
return new SqliteDatabaseFile(path, pool);
|
||||
}
|
||||
else {
|
||||
pool.Dispose();
|
||||
await pool.DisposeAsync();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -65,18 +65,18 @@ public sealed class SqliteDatabaseFile : IDatabaseFile {
|
||||
downloads = new SqliteDownloadRepository(pool);
|
||||
}
|
||||
|
||||
public void Dispose() {
|
||||
public async ValueTask DisposeAsync() {
|
||||
users.Dispose();
|
||||
servers.Dispose();
|
||||
channels.Dispose();
|
||||
messages.Dispose();
|
||||
attachments.Dispose();
|
||||
downloads.Dispose();
|
||||
pool.Dispose();
|
||||
await pool.DisposeAsync();
|
||||
}
|
||||
|
||||
public async Task Vacuum() {
|
||||
using var conn = pool.Take();
|
||||
await using var conn = await pool.Take();
|
||||
await conn.ExecuteAsync("VACUUM");
|
||||
}
|
||||
}
|
||||
|
@ -3,6 +3,6 @@ using Microsoft.Data.Sqlite;
|
||||
|
||||
namespace DHT.Server.Database.Sqlite.Utils;
|
||||
|
||||
interface ISqliteConnection : IDisposable {
|
||||
interface ISqliteConnection : IAsyncDisposable {
|
||||
SqliteConnection InnerConnection { get; }
|
||||
}
|
||||
|
@ -1,100 +1,77 @@
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using DHT.Utils.Logging;
|
||||
using System.Threading.Tasks;
|
||||
using DHT.Utils.Collections;
|
||||
using Microsoft.Data.Sqlite;
|
||||
|
||||
namespace DHT.Server.Database.Sqlite.Utils;
|
||||
|
||||
sealed class SqliteConnectionPool : IDisposable {
|
||||
sealed class SqliteConnectionPool : IAsyncDisposable {
|
||||
public static async Task<SqliteConnectionPool> Create(SqliteConnectionStringBuilder connectionStringBuilder, int poolSize) {
|
||||
var pool = new SqliteConnectionPool(poolSize);
|
||||
await pool.InitializePooledConnections(connectionStringBuilder);
|
||||
return pool;
|
||||
}
|
||||
|
||||
private static string GetConnectionString(SqliteConnectionStringBuilder connectionStringBuilder) {
|
||||
connectionStringBuilder.Pooling = false;
|
||||
return connectionStringBuilder.ToString();
|
||||
}
|
||||
|
||||
private readonly object monitor = new ();
|
||||
private readonly Random rand = new ();
|
||||
private volatile bool isDisposed;
|
||||
private readonly int poolSize;
|
||||
private readonly List<PooledConnection> all;
|
||||
private readonly ConcurrentPool<PooledConnection> free;
|
||||
|
||||
private readonly BlockingCollection<PooledConnection> free = new (new ConcurrentStack<PooledConnection>());
|
||||
private readonly List<PooledConnection> used;
|
||||
private readonly CancellationTokenSource disposalTokenSource = new ();
|
||||
private readonly CancellationToken disposalToken;
|
||||
|
||||
public SqliteConnectionPool(SqliteConnectionStringBuilder connectionStringBuilder, int poolSize) {
|
||||
private SqliteConnectionPool(int poolSize) {
|
||||
this.poolSize = poolSize;
|
||||
this.all = new List<PooledConnection>(poolSize);
|
||||
this.free = new ConcurrentPool<PooledConnection>(poolSize);
|
||||
this.disposalToken = disposalTokenSource.Token;
|
||||
}
|
||||
|
||||
private async Task InitializePooledConnections(SqliteConnectionStringBuilder connectionStringBuilder) {
|
||||
var connectionString = GetConnectionString(connectionStringBuilder);
|
||||
|
||||
for (int i = 0; i < poolSize; i++) {
|
||||
var conn = new SqliteConnection(connectionString);
|
||||
conn.Open();
|
||||
|
||||
var pooledConn = new PooledConnection(this, conn);
|
||||
var pooledConnection = new PooledConnection(this, conn);
|
||||
|
||||
using (var cmd = pooledConn.Command("PRAGMA journal_mode=WAL")) {
|
||||
cmd.ExecuteNonQuery();
|
||||
await using (var cmd = pooledConnection.Command("PRAGMA journal_mode=WAL")) {
|
||||
await cmd.ExecuteNonQueryAsync(disposalToken);
|
||||
}
|
||||
|
||||
free.Add(pooledConn);
|
||||
}
|
||||
|
||||
used = new List<PooledConnection>(poolSize);
|
||||
}
|
||||
|
||||
private void ThrowIfDisposed() {
|
||||
ObjectDisposedException.ThrowIf(isDisposed, nameof(SqliteConnectionPool));
|
||||
}
|
||||
|
||||
public ISqliteConnection Take() {
|
||||
while (true) {
|
||||
ThrowIfDisposed();
|
||||
|
||||
lock (monitor) {
|
||||
if (free.TryTake(out var conn)) {
|
||||
used.Add(conn);
|
||||
return conn;
|
||||
}
|
||||
else {
|
||||
Log.ForType<SqliteConnectionPool>().Warn("Thread " + Environment.CurrentManagedThreadId + " is starving for connections.");
|
||||
}
|
||||
}
|
||||
|
||||
Thread.Sleep(TimeSpan.FromMilliseconds(rand.Next(100, 200)));
|
||||
all.Add(pooledConnection);
|
||||
await free.Push(pooledConnection, disposalToken);
|
||||
}
|
||||
}
|
||||
|
||||
private void Return(PooledConnection conn) {
|
||||
ThrowIfDisposed();
|
||||
|
||||
lock (monitor) {
|
||||
if (used.Remove(conn)) {
|
||||
free.Add(conn);
|
||||
}
|
||||
}
|
||||
public async Task<ISqliteConnection> Take() {
|
||||
return await free.Pop(disposalToken);
|
||||
}
|
||||
|
||||
public void Dispose() {
|
||||
if (isDisposed) {
|
||||
private async Task Return(PooledConnection conn) {
|
||||
await free.Push(conn, disposalToken);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync() {
|
||||
if (disposalToken.IsCancellationRequested) {
|
||||
return;
|
||||
}
|
||||
|
||||
isDisposed = true;
|
||||
|
||||
lock (monitor) {
|
||||
while (free.TryTake(out var conn)) {
|
||||
Close(conn.InnerConnection);
|
||||
}
|
||||
|
||||
foreach (var conn in used) {
|
||||
Close(conn.InnerConnection);
|
||||
}
|
||||
|
||||
free.Dispose();
|
||||
used.Clear();
|
||||
await disposalTokenSource.CancelAsync();
|
||||
|
||||
foreach (var conn in all) {
|
||||
await conn.InnerConnection.CloseAsync();
|
||||
await conn.InnerConnection.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private static void Close(SqliteConnection conn) {
|
||||
conn.Close();
|
||||
conn.Dispose();
|
||||
|
||||
disposalTokenSource.Dispose();
|
||||
}
|
||||
|
||||
private sealed class PooledConnection : ISqliteConnection {
|
||||
@ -107,8 +84,8 @@ sealed class SqliteConnectionPool : IDisposable {
|
||||
this.InnerConnection = conn;
|
||||
}
|
||||
|
||||
void IDisposable.Dispose() {
|
||||
pool.Return(this);
|
||||
public async ValueTask DisposeAsync() {
|
||||
await pool.Return(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,6 @@ public sealed class State : IAsyncDisposable {
|
||||
public async ValueTask DisposeAsync() {
|
||||
await Downloader.Stop();
|
||||
await Server.Stop();
|
||||
Db.Dispose();
|
||||
await Db.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
45
app/Utils/Collections/ConcurrentPool.cs
Normal file
45
app/Utils/Collections/ConcurrentPool.cs
Normal file
@ -0,0 +1,45 @@
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace DHT.Utils.Collections;
|
||||
|
||||
public sealed class ConcurrentPool<T> {
|
||||
private readonly SemaphoreSlim mutexSemaphore;
|
||||
private readonly SemaphoreSlim availableItemSemaphore;
|
||||
private readonly Stack<T> items;
|
||||
|
||||
public ConcurrentPool(int size) {
|
||||
mutexSemaphore = new SemaphoreSlim(1);
|
||||
availableItemSemaphore = new SemaphoreSlim(0, size);
|
||||
items = new Stack<T>();
|
||||
}
|
||||
|
||||
public async Task Push(T item, CancellationToken cancellationToken) {
|
||||
await PushItem(item, cancellationToken);
|
||||
availableItemSemaphore.Release();
|
||||
}
|
||||
|
||||
public async Task<T> Pop(CancellationToken cancellationToken) {
|
||||
await availableItemSemaphore.WaitAsync(cancellationToken);
|
||||
return await PopItem(cancellationToken);
|
||||
}
|
||||
|
||||
private async Task PushItem(T item, CancellationToken cancellationToken) {
|
||||
await mutexSemaphore.WaitAsync(cancellationToken);
|
||||
try {
|
||||
items.Push(item);
|
||||
} finally {
|
||||
mutexSemaphore.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<T> PopItem(CancellationToken cancellationToken) {
|
||||
await mutexSemaphore.WaitAsync(cancellationToken);
|
||||
try {
|
||||
return items.Pop();
|
||||
} finally {
|
||||
mutexSemaphore.Release();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user