From c9e50e1a808df7fc05f2c6579387ea985e7776a1 Mon Sep 17 00:00:00 2001 From: chylex Date: Mon, 1 Jan 2024 09:23:28 +0100 Subject: [PATCH] Refactor database schema upgrades --- app/Desktop/Common/DatabaseGui.cs | 2 +- app/Desktop/Main/Pages/DatabasePageModel.cs | 2 +- .../Main/Screens/WelcomeScreenModel.cs | 2 +- .../Exceptions/DatabaseTooNewException.cs | 4 +- app/Server/Database/Sqlite/Schema.cs | 358 ------------------ .../Database/Sqlite/Schema/ISchemaUpgrade.cs | 8 + .../ISchemaUpgradeCallbacks.cs | 2 +- .../Sqlite/Schema/SqliteSchemaUpgradeTo2.cs | 11 + .../Sqlite/Schema/SqliteSchemaUpgradeTo3.cs | 33 ++ .../Sqlite/Schema/SqliteSchemaUpgradeTo4.cs | 19 + .../Sqlite/Schema/SqliteSchemaUpgradeTo5.cs | 12 + .../Sqlite/Schema/SqliteSchemaUpgradeTo6.cs | 138 +++++++ .../Database/Sqlite/SqliteDatabaseFile.cs | 3 +- app/Server/Database/Sqlite/SqliteSchema.cs | 191 ++++++++++ 14 files changed, 420 insertions(+), 365 deletions(-) delete mode 100644 app/Server/Database/Sqlite/Schema.cs create mode 100644 app/Server/Database/Sqlite/Schema/ISchemaUpgrade.cs rename app/Server/Database/Sqlite/{Utils => Schema}/ISchemaUpgradeCallbacks.cs (89%) create mode 100644 app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo2.cs create mode 100644 app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo3.cs create mode 100644 app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo4.cs create mode 100644 app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo5.cs create mode 100644 app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo6.cs create mode 100644 app/Server/Database/Sqlite/SqliteSchema.cs diff --git a/app/Desktop/Common/DatabaseGui.cs b/app/Desktop/Common/DatabaseGui.cs index 596daca..6c92c57 100644 --- a/app/Desktop/Common/DatabaseGui.cs +++ b/app/Desktop/Common/DatabaseGui.cs @@ -9,7 +9,7 @@ using DHT.Desktop.Dialogs.Message; using DHT.Server.Database; using DHT.Server.Database.Exceptions; using DHT.Server.Database.Sqlite; -using DHT.Server.Database.Sqlite.Utils; +using DHT.Server.Database.Sqlite.Schema; using DHT.Utils.Logging; namespace DHT.Desktop.Common; diff --git a/app/Desktop/Main/Pages/DatabasePageModel.cs b/app/Desktop/Main/Pages/DatabasePageModel.cs index f15b878..b937307 100644 --- a/app/Desktop/Main/Pages/DatabasePageModel.cs +++ b/app/Desktop/Main/Pages/DatabasePageModel.cs @@ -18,7 +18,7 @@ using DHT.Server; using DHT.Server.Data; using DHT.Server.Database; using DHT.Server.Database.Import; -using DHT.Server.Database.Sqlite.Utils; +using DHT.Server.Database.Sqlite.Schema; using DHT.Utils.Logging; namespace DHT.Desktop.Main.Pages; diff --git a/app/Desktop/Main/Screens/WelcomeScreenModel.cs b/app/Desktop/Main/Screens/WelcomeScreenModel.cs index 1dbd918..8a23479 100644 --- a/app/Desktop/Main/Screens/WelcomeScreenModel.cs +++ b/app/Desktop/Main/Screens/WelcomeScreenModel.cs @@ -8,7 +8,7 @@ using DHT.Desktop.Common; using DHT.Desktop.Dialogs.Message; using DHT.Desktop.Dialogs.Progress; using DHT.Server.Database; -using DHT.Server.Database.Sqlite.Utils; +using DHT.Server.Database.Sqlite.Schema; namespace DHT.Desktop.Main.Screens; diff --git a/app/Server/Database/Exceptions/DatabaseTooNewException.cs b/app/Server/Database/Exceptions/DatabaseTooNewException.cs index ba42da7..b441efb 100644 --- a/app/Server/Database/Exceptions/DatabaseTooNewException.cs +++ b/app/Server/Database/Exceptions/DatabaseTooNewException.cs @@ -5,9 +5,9 @@ namespace DHT.Server.Database.Exceptions; public sealed class DatabaseTooNewException : Exception { public int DatabaseVersion { get; } - public int CurrentVersion => Schema.Version; + public int CurrentVersion => SqliteSchema.Version; - internal DatabaseTooNewException(int databaseVersion) : base("Database is too new: " + databaseVersion + " > " + Schema.Version) { + internal DatabaseTooNewException(int databaseVersion) : base("Database is too new: " + databaseVersion + " > " + SqliteSchema.Version) { this.DatabaseVersion = databaseVersion; } } diff --git a/app/Server/Database/Sqlite/Schema.cs b/app/Server/Database/Sqlite/Schema.cs deleted file mode 100644 index 01318c6..0000000 --- a/app/Server/Database/Sqlite/Schema.cs +++ /dev/null @@ -1,358 +0,0 @@ -using System.Collections.Generic; -using System.Data.Common; -using System.Threading.Tasks; -using DHT.Server.Database.Exceptions; -using DHT.Server.Database.Sqlite.Utils; -using DHT.Server.Download; -using DHT.Utils.Logging; -using Microsoft.Data.Sqlite; - -namespace DHT.Server.Database.Sqlite; - -sealed class Schema { - internal const int Version = 6; - - private static readonly Log Log = Log.ForType(); - - private readonly ISqliteConnection conn; - - public Schema(ISqliteConnection conn) { - this.conn = conn; - } - - public async Task Setup(ISchemaUpgradeCallbacks callbacks) { - await conn.ExecuteAsync("CREATE TABLE IF NOT EXISTS metadata (key TEXT PRIMARY KEY, value TEXT)"); - - var dbVersionStr = await conn.ExecuteReaderAsync("SELECT value FROM metadata WHERE key = 'version'", static reader => reader?.GetString(0)); - if (dbVersionStr == null) { - await InitializeSchemas(); - } - else if (!int.TryParse(dbVersionStr, out int dbVersion) || dbVersion < 1) { - throw new InvalidDatabaseVersionException(dbVersionStr); - } - else if (dbVersion > Version) { - throw new DatabaseTooNewException(dbVersion); - } - else if (dbVersion < Version) { - var proceed = await callbacks.CanUpgrade(); - if (!proceed) { - return false; - } - - await callbacks.Start(Version - dbVersion, async reporter => await UpgradeSchemas(dbVersion, reporter)); - } - - return true; - } - - private async Task InitializeSchemas() { - await conn.ExecuteAsync(""" - CREATE TABLE users ( - id INTEGER PRIMARY KEY NOT NULL, - name TEXT NOT NULL, - avatar_url TEXT, - discriminator TEXT - ) - """); - - await conn.ExecuteAsync(""" - CREATE TABLE servers ( - id INTEGER PRIMARY KEY NOT NULL, - name TEXT NOT NULL, - type TEXT NOT NULL - ) - """); - - await conn.ExecuteAsync(""" - CREATE TABLE channels ( - id INTEGER PRIMARY KEY NOT NULL, - server INTEGER NOT NULL, - name TEXT NOT NULL, - parent_id INTEGER, - position INTEGER, - topic TEXT, - nsfw INTEGER - ) - """); - - await conn.ExecuteAsync(""" - CREATE TABLE messages ( - message_id INTEGER PRIMARY KEY NOT NULL, - sender_id INTEGER NOT NULL, - channel_id INTEGER NOT NULL, - text TEXT NOT NULL, - timestamp INTEGER NOT NULL - ) - """); - - await conn.ExecuteAsync(""" - CREATE TABLE attachments ( - message_id INTEGER NOT NULL, - attachment_id INTEGER NOT NULL PRIMARY KEY NOT NULL, - name TEXT NOT NULL, - type TEXT, - normalized_url TEXT NOT NULL, - download_url TEXT, - size INTEGER NOT NULL, - width INTEGER, - height INTEGER - ) - """); - - await conn.ExecuteAsync(""" - CREATE TABLE embeds ( - message_id INTEGER NOT NULL, - json TEXT NOT NULL - ) - """); - - await conn.ExecuteAsync(""" - CREATE TABLE downloads ( - normalized_url TEXT NOT NULL PRIMARY KEY, - download_url TEXT, - status INTEGER NOT NULL, - size INTEGER NOT NULL, - blob BLOB - ) - """); - - await conn.ExecuteAsync(""" - CREATE TABLE reactions ( - message_id INTEGER NOT NULL, - emoji_id INTEGER, - emoji_name TEXT, - emoji_flags INTEGER NOT NULL, - count INTEGER NOT NULL - ) - """); - - await CreateMessageEditTimestampTable(); - await CreateMessageRepliedToTable(); - - await conn.ExecuteAsync("CREATE INDEX attachments_message_ix ON attachments(message_id)"); - await conn.ExecuteAsync("CREATE INDEX embeds_message_ix ON embeds(message_id)"); - await conn.ExecuteAsync("CREATE INDEX reactions_message_ix ON reactions(message_id)"); - - await conn.ExecuteAsync("INSERT INTO metadata (key, value) VALUES ('version', " + Version + ")"); - } - - private async Task CreateMessageEditTimestampTable() { - await conn.ExecuteAsync(""" - CREATE TABLE edit_timestamps ( - message_id INTEGER PRIMARY KEY NOT NULL, - edit_timestamp INTEGER NOT NULL - ) - """); - } - - private async Task CreateMessageRepliedToTable() { - await conn.ExecuteAsync(""" - CREATE TABLE replied_to ( - message_id INTEGER PRIMARY KEY NOT NULL, - replied_to_id INTEGER NOT NULL - ) - """); - } - - private async Task NormalizeAttachmentUrls(ISchemaUpgradeCallbacks.IProgressReporter reporter) { - await reporter.SubWork("Preparing attachments...", 0, 0); - - var normalizedUrls = new Dictionary(); - - await using (var selectCmd = conn.Command("SELECT attachment_id, url FROM attachments")) { - await using var reader = await selectCmd.ExecuteReaderAsync(); - - while (reader.Read()) { - var attachmentId = reader.GetInt64(0); - var originalUrl = reader.GetString(1); - normalizedUrls[attachmentId] = DiscordCdn.NormalizeUrl(originalUrl); - } - } - - await using var tx = await conn.BeginTransactionAsync(); - - int totalUrls = normalizedUrls.Count; - int processedUrls = -1; - - await using (var updateCmd = conn.Command("UPDATE attachments SET download_url = url, url = :normalized_url WHERE attachment_id = :attachment_id")) { - updateCmd.Add(":attachment_id", SqliteType.Integer); - updateCmd.Add(":normalized_url", SqliteType.Text); - - foreach (var (attachmentId, normalizedUrl) in normalizedUrls) { - if (++processedUrls % 1000 == 0) { - await reporter.SubWork("Updating URLs...", processedUrls, totalUrls); - } - - updateCmd.Set(":attachment_id", attachmentId); - updateCmd.Set(":normalized_url", normalizedUrl); - updateCmd.ExecuteNonQuery(); - } - } - - await reporter.SubWork("Updating URLs...", totalUrls, totalUrls); - - await tx.CommitAsync(); - } - - private async Task NormalizeDownloadUrls(ISchemaUpgradeCallbacks.IProgressReporter reporter) { - await reporter.SubWork("Preparing downloads...", 0, 0); - - var normalizedUrlsToOriginalUrls = new Dictionary(); - var duplicateUrlsToDelete = new HashSet(); - - await using (var selectCmd = conn.Command("SELECT url FROM downloads ORDER BY CASE WHEN status = 200 THEN 0 ELSE 1 END")) { - await using var reader = await selectCmd.ExecuteReaderAsync(); - - while (reader.Read()) { - var originalUrl = reader.GetString(0); - var normalizedUrl = DiscordCdn.NormalizeUrl(originalUrl); - - if (!normalizedUrlsToOriginalUrls.TryAdd(normalizedUrl, originalUrl)) { - duplicateUrlsToDelete.Add(originalUrl); - } - } - } - - await conn.ExecuteAsync("PRAGMA cache_size = -20000"); - - DbTransaction tx; - - await using (tx = await conn.BeginTransactionAsync()) { - await reporter.SubWork("Deleting duplicates...", 0, 0); - - await using (var deleteCmd = conn.Delete("downloads", ("url", SqliteType.Text))) { - foreach (var duplicateUrl in duplicateUrlsToDelete) { - deleteCmd.Set(":url", duplicateUrl); - deleteCmd.ExecuteNonQuery(); - } - } - - await tx.CommitAsync(); - } - - int totalUrls = normalizedUrlsToOriginalUrls.Count; - int processedUrls = -1; - - tx = await conn.BeginTransactionAsync(); - - await using (var updateCmd = conn.Command("UPDATE downloads SET download_url = :download_url, url = :normalized_url WHERE url = :download_url")) { - updateCmd.Add(":normalized_url", SqliteType.Text); - updateCmd.Add(":download_url", SqliteType.Text); - - foreach (var (normalizedUrl, downloadUrl) in normalizedUrlsToOriginalUrls) { - if (++processedUrls % 100 == 0) { - await reporter.SubWork("Updating URLs...", processedUrls, totalUrls); - - // Not proper way of dealing with transactions, but it avoids a long commit at the end. - // Schema upgrades are already non-atomic anyways, so this doesn't make it worse. - await tx.CommitAsync(); - await tx.DisposeAsync(); - - tx = await conn.BeginTransactionAsync(); - updateCmd.Transaction = (SqliteTransaction) tx; - } - - updateCmd.Set(":normalized_url", normalizedUrl); - updateCmd.Set(":download_url", downloadUrl); - updateCmd.ExecuteNonQuery(); - } - } - - await reporter.SubWork("Updating URLs...", totalUrls, totalUrls); - - await tx.CommitAsync(); - await tx.DisposeAsync(); - - await conn.ExecuteAsync("PRAGMA cache_size = -2000"); - } - - private async Task UpgradeSchemas(int dbVersion, ISchemaUpgradeCallbacks.IProgressReporter reporter) { - var perf = Log.Start("from version " + dbVersion); - - await conn.ExecuteAsync("UPDATE metadata SET value = " + Version + " WHERE key = 'version'"); - - if (dbVersion <= 1) { - await reporter.MainWork("Applying schema changes...", 0, 1); - await conn.ExecuteAsync("ALTER TABLE channels ADD parent_id INTEGER"); - - perf.Step("Upgrade to version 2"); - await reporter.NextVersion(); - } - - if (dbVersion <= 2) { - await reporter.MainWork("Applying schema changes...", 0, 1); - - await CreateMessageEditTimestampTable(); - await CreateMessageRepliedToTable(); - - await conn.ExecuteAsync(""" - INSERT INTO edit_timestamps (message_id, edit_timestamp) - SELECT message_id, edit_timestamp - FROM messages - WHERE edit_timestamp IS NOT NULL - """); - - await conn.ExecuteAsync(""" - INSERT INTO replied_to (message_id, replied_to_id) - SELECT message_id, replied_to_id - FROM messages - WHERE replied_to_id IS NOT NULL - """); - - await conn.ExecuteAsync("ALTER TABLE messages DROP COLUMN replied_to_id"); - await conn.ExecuteAsync("ALTER TABLE messages DROP COLUMN edit_timestamp"); - - perf.Step("Upgrade to version 3"); - - await reporter.MainWork("Vacuuming the database...", 1, 1); - await conn.ExecuteAsync("VACUUM"); - perf.Step("Vacuum"); - - await reporter.NextVersion(); - } - - if (dbVersion <= 3) { - await conn.ExecuteAsync(""" - CREATE TABLE downloads ( - url TEXT NOT NULL PRIMARY KEY, - status INTEGER NOT NULL, - size INTEGER NOT NULL, - blob BLOB - ) - """); - - perf.Step("Upgrade to version 4"); - await reporter.NextVersion(); - } - - if (dbVersion <= 4) { - await reporter.MainWork("Applying schema changes...", 0, 1); - await conn.ExecuteAsync("ALTER TABLE attachments ADD width INTEGER"); - await conn.ExecuteAsync("ALTER TABLE attachments ADD height INTEGER"); - - perf.Step("Upgrade to version 5"); - await reporter.NextVersion(); - } - - if (dbVersion <= 5) { - await reporter.MainWork("Applying schema changes...", 0, 3); - await conn.ExecuteAsync("ALTER TABLE attachments ADD download_url TEXT"); - await conn.ExecuteAsync("ALTER TABLE downloads ADD download_url TEXT"); - - await reporter.MainWork("Updating attachments...", 1, 3); - await NormalizeAttachmentUrls(reporter); - - await reporter.MainWork("Updating downloads...", 2, 3); - await NormalizeDownloadUrls(reporter); - - await reporter.MainWork("Applying schema changes...", 3, 3); - await conn.ExecuteAsync("ALTER TABLE attachments RENAME COLUMN url TO normalized_url"); - await conn.ExecuteAsync("ALTER TABLE downloads RENAME COLUMN url TO normalized_url"); - - perf.Step("Upgrade to version 6"); - await reporter.NextVersion(); - } - - perf.End(); - } -} diff --git a/app/Server/Database/Sqlite/Schema/ISchemaUpgrade.cs b/app/Server/Database/Sqlite/Schema/ISchemaUpgrade.cs new file mode 100644 index 0000000..ff589f5 --- /dev/null +++ b/app/Server/Database/Sqlite/Schema/ISchemaUpgrade.cs @@ -0,0 +1,8 @@ +using System.Threading.Tasks; +using DHT.Server.Database.Sqlite.Utils; + +namespace DHT.Server.Database.Sqlite.Schema; + +interface ISchemaUpgrade { + Task Run(ISqliteConnection conn, ISchemaUpgradeCallbacks.IProgressReporter reporter); +} diff --git a/app/Server/Database/Sqlite/Utils/ISchemaUpgradeCallbacks.cs b/app/Server/Database/Sqlite/Schema/ISchemaUpgradeCallbacks.cs similarity index 89% rename from app/Server/Database/Sqlite/Utils/ISchemaUpgradeCallbacks.cs rename to app/Server/Database/Sqlite/Schema/ISchemaUpgradeCallbacks.cs index 013806e..56736e6 100644 --- a/app/Server/Database/Sqlite/Utils/ISchemaUpgradeCallbacks.cs +++ b/app/Server/Database/Sqlite/Schema/ISchemaUpgradeCallbacks.cs @@ -1,7 +1,7 @@ using System; using System.Threading.Tasks; -namespace DHT.Server.Database.Sqlite.Utils; +namespace DHT.Server.Database.Sqlite.Schema; public interface ISchemaUpgradeCallbacks { Task CanUpgrade(); diff --git a/app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo2.cs b/app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo2.cs new file mode 100644 index 0000000..b249fe4 --- /dev/null +++ b/app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo2.cs @@ -0,0 +1,11 @@ +using System.Threading.Tasks; +using DHT.Server.Database.Sqlite.Utils; + +namespace DHT.Server.Database.Sqlite.Schema; + +sealed class SqliteSchemaUpgradeTo2 : ISchemaUpgrade { + async Task ISchemaUpgrade.Run(ISqliteConnection conn, ISchemaUpgradeCallbacks.IProgressReporter reporter) { + await reporter.MainWork("Applying schema changes...", 0, 1); + await conn.ExecuteAsync("ALTER TABLE channels ADD parent_id INTEGER"); + } +} diff --git a/app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo3.cs b/app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo3.cs new file mode 100644 index 0000000..16f5df6 --- /dev/null +++ b/app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo3.cs @@ -0,0 +1,33 @@ +using System.Threading.Tasks; +using DHT.Server.Database.Sqlite.Utils; + +namespace DHT.Server.Database.Sqlite.Schema; + +sealed class SqliteSchemaUpgradeTo3 : ISchemaUpgrade { + async Task ISchemaUpgrade.Run(ISqliteConnection conn, ISchemaUpgradeCallbacks.IProgressReporter reporter) { + await reporter.MainWork("Applying schema changes...", 0, 1); + + await SqliteSchema.CreateMessageEditTimestampTable(conn); + await SqliteSchema.CreateMessageRepliedToTable(conn); + + await conn.ExecuteAsync(""" + INSERT INTO edit_timestamps (message_id, edit_timestamp) + SELECT message_id, edit_timestamp + FROM messages + WHERE edit_timestamp IS NOT NULL + """); + + await conn.ExecuteAsync(""" + INSERT INTO replied_to (message_id, replied_to_id) + SELECT message_id, replied_to_id + FROM messages + WHERE replied_to_id IS NOT NULL + """); + + await conn.ExecuteAsync("ALTER TABLE messages DROP COLUMN replied_to_id"); + await conn.ExecuteAsync("ALTER TABLE messages DROP COLUMN edit_timestamp"); + + await reporter.MainWork("Vacuuming the database...", 1, 1); + await conn.ExecuteAsync("VACUUM"); + } +} diff --git a/app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo4.cs b/app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo4.cs new file mode 100644 index 0000000..f39b96d --- /dev/null +++ b/app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo4.cs @@ -0,0 +1,19 @@ +using System.Threading.Tasks; +using DHT.Server.Database.Sqlite.Utils; + +namespace DHT.Server.Database.Sqlite.Schema; + +sealed class SqliteSchemaUpgradeTo4 : ISchemaUpgrade { + async Task ISchemaUpgrade.Run(ISqliteConnection conn, ISchemaUpgradeCallbacks.IProgressReporter reporter) { + await reporter.MainWork("Applying schema changes...", 0, 1); + + await conn.ExecuteAsync(""" + CREATE TABLE downloads ( + url TEXT NOT NULL PRIMARY KEY, + status INTEGER NOT NULL, + size INTEGER NOT NULL, + blob BLOB + ) + """); + } +} diff --git a/app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo5.cs b/app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo5.cs new file mode 100644 index 0000000..6555408 --- /dev/null +++ b/app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo5.cs @@ -0,0 +1,12 @@ +using System.Threading.Tasks; +using DHT.Server.Database.Sqlite.Utils; + +namespace DHT.Server.Database.Sqlite.Schema; + +sealed class SqliteSchemaUpgradeTo5 : ISchemaUpgrade { + async Task ISchemaUpgrade.Run(ISqliteConnection conn, ISchemaUpgradeCallbacks.IProgressReporter reporter) { + await reporter.MainWork("Applying schema changes...", 0, 1); + await conn.ExecuteAsync("ALTER TABLE attachments ADD width INTEGER"); + await conn.ExecuteAsync("ALTER TABLE attachments ADD height INTEGER"); + } +} diff --git a/app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo6.cs b/app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo6.cs new file mode 100644 index 0000000..97aaca4 --- /dev/null +++ b/app/Server/Database/Sqlite/Schema/SqliteSchemaUpgradeTo6.cs @@ -0,0 +1,138 @@ +using System.Collections.Generic; +using System.Data.Common; +using System.Threading.Tasks; +using DHT.Server.Database.Sqlite.Utils; +using DHT.Server.Download; +using Microsoft.Data.Sqlite; + +namespace DHT.Server.Database.Sqlite.Schema; + +sealed class SqliteSchemaUpgradeTo6 : ISchemaUpgrade { + async Task ISchemaUpgrade.Run(ISqliteConnection conn, ISchemaUpgradeCallbacks.IProgressReporter reporter) { + await reporter.MainWork("Applying schema changes...", 0, 3); + await conn.ExecuteAsync("ALTER TABLE attachments ADD download_url TEXT"); + await conn.ExecuteAsync("ALTER TABLE downloads ADD download_url TEXT"); + + await reporter.MainWork("Updating attachments...", 1, 3); + await NormalizeAttachmentUrls(conn, reporter); + + await reporter.MainWork("Updating downloads...", 2, 3); + await NormalizeDownloadUrls(conn, reporter); + + await reporter.MainWork("Applying schema changes...", 3, 3); + await conn.ExecuteAsync("ALTER TABLE attachments RENAME COLUMN url TO normalized_url"); + await conn.ExecuteAsync("ALTER TABLE downloads RENAME COLUMN url TO normalized_url"); + } + + private async Task NormalizeAttachmentUrls(ISqliteConnection conn, ISchemaUpgradeCallbacks.IProgressReporter reporter) { + await reporter.SubWork("Preparing attachments...", 0, 0); + + var normalizedUrls = new Dictionary(); + + await using (var selectCmd = conn.Command("SELECT attachment_id, url FROM attachments")) { + await using var reader = await selectCmd.ExecuteReaderAsync(); + + while (reader.Read()) { + var attachmentId = reader.GetInt64(0); + var originalUrl = reader.GetString(1); + normalizedUrls[attachmentId] = DiscordCdn.NormalizeUrl(originalUrl); + } + } + + await using var tx = await conn.BeginTransactionAsync(); + + int totalUrls = normalizedUrls.Count; + int processedUrls = -1; + + await using (var updateCmd = conn.Command("UPDATE attachments SET download_url = url, url = :normalized_url WHERE attachment_id = :attachment_id")) { + updateCmd.Add(":attachment_id", SqliteType.Integer); + updateCmd.Add(":normalized_url", SqliteType.Text); + + foreach (var (attachmentId, normalizedUrl) in normalizedUrls) { + if (++processedUrls % 1000 == 0) { + await reporter.SubWork("Updating URLs...", processedUrls, totalUrls); + } + + updateCmd.Set(":attachment_id", attachmentId); + updateCmd.Set(":normalized_url", normalizedUrl); + updateCmd.ExecuteNonQuery(); + } + } + + await reporter.SubWork("Updating URLs...", totalUrls, totalUrls); + + await tx.CommitAsync(); + } + + private async Task NormalizeDownloadUrls(ISqliteConnection conn, ISchemaUpgradeCallbacks.IProgressReporter reporter) { + await reporter.SubWork("Preparing downloads...", 0, 0); + + var normalizedUrlsToOriginalUrls = new Dictionary(); + var duplicateUrlsToDelete = new HashSet(); + + await using (var selectCmd = conn.Command("SELECT url FROM downloads ORDER BY CASE WHEN status = 200 THEN 0 ELSE 1 END")) { + await using var reader = await selectCmd.ExecuteReaderAsync(); + + while (reader.Read()) { + var originalUrl = reader.GetString(0); + var normalizedUrl = DiscordCdn.NormalizeUrl(originalUrl); + + if (!normalizedUrlsToOriginalUrls.TryAdd(normalizedUrl, originalUrl)) { + duplicateUrlsToDelete.Add(originalUrl); + } + } + } + + await conn.ExecuteAsync("PRAGMA cache_size = -20000"); + + DbTransaction tx; + + await using (tx = await conn.BeginTransactionAsync()) { + await reporter.SubWork("Deleting duplicates...", 0, 0); + + await using (var deleteCmd = conn.Delete("downloads", ("url", SqliteType.Text))) { + foreach (var duplicateUrl in duplicateUrlsToDelete) { + deleteCmd.Set(":url", duplicateUrl); + deleteCmd.ExecuteNonQuery(); + } + } + + await tx.CommitAsync(); + } + + int totalUrls = normalizedUrlsToOriginalUrls.Count; + int processedUrls = -1; + + tx = await conn.BeginTransactionAsync(); + + await using (var updateCmd = conn.Command("UPDATE downloads SET download_url = :download_url, url = :normalized_url WHERE url = :download_url")) { + updateCmd.Add(":normalized_url", SqliteType.Text); + updateCmd.Add(":download_url", SqliteType.Text); + + foreach (var (normalizedUrl, downloadUrl) in normalizedUrlsToOriginalUrls) { + if (++processedUrls % 100 == 0) { + await reporter.SubWork("Updating URLs...", processedUrls, totalUrls); + + // Not proper way of dealing with transactions, but it avoids a long commit at the end. + // Schema upgrades are already non-atomic anyways, so this doesn't make it worse. + await tx.CommitAsync(); + await tx.DisposeAsync(); + + tx = await conn.BeginTransactionAsync(); + updateCmd.Transaction = (SqliteTransaction) tx; + } + + updateCmd.Set(":normalized_url", normalizedUrl); + updateCmd.Set(":download_url", downloadUrl); + updateCmd.ExecuteNonQuery(); + } + } + + await reporter.SubWork("Updating URLs...", totalUrls, totalUrls); + + await tx.CommitAsync(); + await tx.DisposeAsync(); + + await conn.ExecuteAsync("PRAGMA cache_size = -2000"); + } +} diff --git a/app/Server/Database/Sqlite/SqliteDatabaseFile.cs b/app/Server/Database/Sqlite/SqliteDatabaseFile.cs index c664ee6..02a6c75 100644 --- a/app/Server/Database/Sqlite/SqliteDatabaseFile.cs +++ b/app/Server/Database/Sqlite/SqliteDatabaseFile.cs @@ -2,6 +2,7 @@ using System; using System.Threading.Tasks; using DHT.Server.Database.Repositories; using DHT.Server.Database.Sqlite.Repositories; +using DHT.Server.Database.Sqlite.Schema; using DHT.Server.Database.Sqlite.Utils; using Microsoft.Data.Sqlite; @@ -21,7 +22,7 @@ public sealed class SqliteDatabaseFile : IDatabaseFile { try { await using var conn = await pool.Take(); - wasOpened = await new Schema(conn).Setup(schemaUpgradeCallbacks); + wasOpened = await new SqliteSchema(conn).Setup(schemaUpgradeCallbacks); } catch (Exception) { await pool.DisposeAsync(); throw; diff --git a/app/Server/Database/Sqlite/SqliteSchema.cs b/app/Server/Database/Sqlite/SqliteSchema.cs new file mode 100644 index 0000000..0980f83 --- /dev/null +++ b/app/Server/Database/Sqlite/SqliteSchema.cs @@ -0,0 +1,191 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using DHT.Server.Database.Exceptions; +using DHT.Server.Database.Sqlite.Schema; +using DHT.Server.Database.Sqlite.Utils; +using DHT.Utils.Logging; + +namespace DHT.Server.Database.Sqlite; + +sealed class SqliteSchema { + internal const int Version = 6; + + private static readonly Log Log = Log.ForType(); + + private readonly ISqliteConnection conn; + + public SqliteSchema(ISqliteConnection conn) { + this.conn = conn; + } + + public async Task Setup(ISchemaUpgradeCallbacks callbacks) { + await conn.ExecuteAsync("CREATE TABLE IF NOT EXISTS metadata (key TEXT PRIMARY KEY, value TEXT)"); + + var dbVersionStr = await conn.ExecuteReaderAsync("SELECT value FROM metadata WHERE key = 'version'", static reader => reader?.GetString(0)); + if (dbVersionStr == null) { + await InitializeSchemas(); + } + else if (!int.TryParse(dbVersionStr, out int dbVersion) || dbVersion < 1) { + throw new InvalidDatabaseVersionException(dbVersionStr); + } + else if (dbVersion > Version) { + throw new DatabaseTooNewException(dbVersion); + } + else if (dbVersion < Version) { + var proceed = await callbacks.CanUpgrade(); + if (!proceed) { + return false; + } + + await callbacks.Start(Version - dbVersion, async reporter => await UpgradeSchemas(dbVersion, reporter)); + } + + return true; + } + + private async Task InitializeSchemas() { + await conn.ExecuteAsync(""" + CREATE TABLE users ( + id INTEGER PRIMARY KEY NOT NULL, + name TEXT NOT NULL, + avatar_url TEXT, + discriminator TEXT + ) + """); + + await conn.ExecuteAsync(""" + CREATE TABLE servers ( + id INTEGER PRIMARY KEY NOT NULL, + name TEXT NOT NULL, + type TEXT NOT NULL + ) + """); + + await conn.ExecuteAsync(""" + CREATE TABLE channels ( + id INTEGER PRIMARY KEY NOT NULL, + server INTEGER NOT NULL, + name TEXT NOT NULL, + parent_id INTEGER, + position INTEGER, + topic TEXT, + nsfw INTEGER + ) + """); + + await conn.ExecuteAsync(""" + CREATE TABLE messages ( + message_id INTEGER PRIMARY KEY NOT NULL, + sender_id INTEGER NOT NULL, + channel_id INTEGER NOT NULL, + text TEXT NOT NULL, + timestamp INTEGER NOT NULL + ) + """); + + await conn.ExecuteAsync(""" + CREATE TABLE attachments ( + message_id INTEGER NOT NULL, + attachment_id INTEGER NOT NULL PRIMARY KEY NOT NULL, + name TEXT NOT NULL, + type TEXT, + normalized_url TEXT NOT NULL, + download_url TEXT, + size INTEGER NOT NULL, + width INTEGER, + height INTEGER + ) + """); + + await conn.ExecuteAsync(""" + CREATE TABLE embeds ( + message_id INTEGER NOT NULL, + json TEXT NOT NULL + ) + """); + + await conn.ExecuteAsync(""" + CREATE TABLE reactions ( + message_id INTEGER NOT NULL, + emoji_id INTEGER, + emoji_name TEXT, + emoji_flags INTEGER NOT NULL, + count INTEGER NOT NULL + ) + """); + + await CreateMessageEditTimestampTable(conn); + await CreateMessageRepliedToTable(conn); + + await conn.ExecuteAsync(""" + CREATE TABLE downloads ( + normalized_url TEXT NOT NULL PRIMARY KEY, + download_url TEXT, + status INTEGER NOT NULL, + size INTEGER NOT NULL, + blob BLOB + ) + """); + + await conn.ExecuteAsync(""" + CREATE TABLE reactions ( + message_id INTEGER NOT NULL, + emoji_id INTEGER, + emoji_name TEXT, + emoji_flags INTEGER NOT NULL, + count INTEGER NOT NULL + ) + """); + + await conn.ExecuteAsync("CREATE INDEX attachments_message_ix ON attachments(message_id)"); + await conn.ExecuteAsync("CREATE INDEX embeds_message_ix ON embeds(message_id)"); + await conn.ExecuteAsync("CREATE INDEX reactions_message_ix ON reactions(message_id)"); + + await conn.ExecuteAsync("INSERT INTO metadata (key, value) VALUES ('version', " + Version + ")"); + } + + internal static async Task CreateMessageEditTimestampTable(ISqliteConnection conn) { + await conn.ExecuteAsync(""" + CREATE TABLE edit_timestamps ( + message_id INTEGER PRIMARY KEY NOT NULL, + edit_timestamp INTEGER NOT NULL + ) + """); + } + + internal static async Task CreateMessageRepliedToTable(ISqliteConnection conn) { + await conn.ExecuteAsync(""" + CREATE TABLE replied_to ( + message_id INTEGER PRIMARY KEY NOT NULL, + replied_to_id INTEGER NOT NULL + ) + """); + } + + private async Task UpgradeSchemas(int dbVersion, ISchemaUpgradeCallbacks.IProgressReporter reporter) { + var upgrades = new Dictionary { + { 1, new SqliteSchemaUpgradeTo2() }, + { 2, new SqliteSchemaUpgradeTo3() }, + { 3, new SqliteSchemaUpgradeTo4() }, + { 4, new SqliteSchemaUpgradeTo5() }, + { 5, new SqliteSchemaUpgradeTo6() }, + }; + + var perf = Log.Start("from version " + dbVersion); + + for (int fromVersion = dbVersion; fromVersion < Version; fromVersion++) { + var toVersion = fromVersion + 1; + + if (upgrades.TryGetValue(fromVersion, out var upgrade)) { + await upgrade.Run(conn, reporter); + } + + await conn.ExecuteAsync("UPDATE metadata SET value = " + toVersion + " WHERE key = 'version'"); + + perf.Step("Upgrade to version " + toVersion); + await reporter.NextVersion(); + } + + perf.End(); + } +}