This commit is contained in:
norohind 2024-04-08 23:13:48 +03:00
commit b06716de4f
8 changed files with 39851 additions and 0 deletions

5
.dockerignore Normal file
View File

@ -0,0 +1,5 @@
.idea
.mypy_cache
__pycache__
tests
Dockerfile

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
.idea
.mypy_cache
__pycache__

7
Dockerfile Normal file
View File

@ -0,0 +1,7 @@
FROM docker.io/python:3.11
COPY change_path.py /bin/change_path.py
COPY nmv.py /bin/nmv
COPY shared.py /bin/shared.py
CMD ["/bin/bash"]

171
change_path.py Executable file
View File

@ -0,0 +1,171 @@
#!/bin/python3
# This script changes the path for a folder or file in Navidrome's database, allowing music files to be
# moved or renamed on the file system without losing associated metadata in Navidrome. Since the original
# version, it has been updated to account for the media_file IDs, which are calculated from the path value
# and referenced in several tables.
#
# This script is based on Navidrome version 0.49.2. If you are running an older version of Navidrome, it
# will likely fail. If you are running a newer version of Navidrome, your mileage may vary.
#
# It does NOT make any modifications to the file system - only to the Navidrome database.
#
# It does not rescan the file; it assumes nothing has changed but the path. If you're moving files
# and also updating their contents (e.g. tags or bitrate), run this to change the path(s) in the
# database, and then run a full scan to update the metadata.
#
# Place this file in the same directory as navidrome.db, which is /var/lib/navidrome on Linux, and be sure
# to use fully qualified paths for arguments. It must be run as a user that has write access to the
# navidrome.db file.
#
# Generic use - note that you may need to use python3 instead of python, depending on your system:
# python change_path.py FROM_PATH TO_PATH
#
# Example: Rename/move a folder (note trailing slashes):
# python change_path.py /mnt/music/artists/Bjork/ /mnt/music/artists/Björk/
#
# Example: Rename a song file (use quotes for paths with spaces):
# python change_path.py "/mnt/music/artists/Test 1/song.mp3" "/mnt/music/artists/Test 2/01 - Song.mp3"
#
# The script's output lists each path updated along with the MD5 ID calculated from it and the row
# count updates for each table referencing the old ID.
#
# Note that Navidrome's scanner will automatically remove files from its database if they're found to
# be missing, so it's important to stop the Navidrome server process before moving files, and to update
# the database with this script prior to restarting it.
#
# Steps to use:
# 1. Stop the Navidrome server.
# 2. Make a backup of your navidrome.db file, so you can roll back any unwanted changes.
# 3. Move or rename folders and files as needed on the file system.
# 4. Run this script to update the Navidrome database for any files/folders moved or renamed.
# 5. Start Navidrome service.
# 6. Optionally run a full scan if file contents have changed.
#
# Source: https://gist.github.com/bagaag/3d64e3349b6ed3bfd6e01813222db055
#
import hashlib
import os
import sqlite3
import sys
from shared import Song, get_media_by_path
def exec_update(sql: str, params: tuple, conn: sqlite3.Connection):
cur = conn.cursor()
cur.execute(sql, params)
# conn.commit()
rc = cur.rowcount
cur.close()
return rc
def replace_values(res: tuple[Song, ...], to_path: str, from_path: str, conn: sqlite3.Connection):
replaced = []
for row in res:
old_id = row.id
old_path = str(row.path)
if from_path.endswith('/'):
new_path = old_path.replace(from_path, to_path)
else:
new_path = to_path
new_id = md5(new_path)
sql = f"""
UPDATE media_file
SET path = ?, id = ?
WHERE path = ? and id = ?;
"""
media_file = exec_update(sql, (new_path, new_id, old_path, old_id), conn)
sql = f"""
UPDATE annotation
SET item_id = ?
WHERE item_id = ?
AND item_type='media_file';
"""
annotation = exec_update(sql, (new_id, old_id), conn)
sql = f"""
UPDATE media_file_genres
SET media_file_id = ?
WHERE media_file_id = ?;
"""
media_file_genres = exec_update(sql, (new_id, old_id), conn)
sql = f"""
UPDATE playlist_tracks
SET media_file_id = ?
WHERE media_file_id = ?;
"""
playlist_tracks = exec_update(sql, (new_id, old_id), conn)
sql = f"""
UPDATE bookmark
SET item_id = ?
WHERE item_id = ?
AND item_type = 'media_file';
"""
bookmark = exec_update(sql, (new_id, old_id), conn)
sql = f"""
UPDATE album
SET embed_art_path = ?
WHERE embed_art_path = ?;
"""
album = exec_update(sql, (new_path, old_path), conn)
replaced.append({
'old_id': old_id,
'old_path': old_path,
'new_id': new_id,
'new_path': new_path,
'media_file': media_file,
'annotation': annotation,
'media_file_genres': media_file_genres,
'playlist_tracks': playlist_tracks,
'bookmark': bookmark,
'album': album
})
return replaced
def md5(s: str):
return hashlib.md5(s.encode('utf-8')).hexdigest()
def main(conn: sqlite3.Connection, from_path: str, to_path: str):
res = get_media_by_path(conn, from_path)
print(f'Found {len(res)} path matches.')
updated = replace_values(res, to_path, from_path, conn)
for update in updated:
print('FROM: ' + update['old_path'])
print(' ' + update['old_id'])
print('TO: ' + update['new_path'])
print(' ' + update['new_id'])
print(' media_file: ' + str(update['media_file']))
print(' annotation: ' + str(update['annotation']))
print(' media_file_genres: ' + str(update['media_file_genres']))
print(' playlist_tracks: ' + str(update['playlist_tracks']))
print(' bookmark: ' + str(update['bookmark']))
print(' album: ' + str(update['album']))
def cli_entrypoint():
if len(sys.argv) < 3:
print("Usage: python change_path.py FROM_PATH TO_PATH")
exit()
from_path = sys.argv[1]
to_path = sys.argv[2]
if (from_path.endswith(os.sep) and not to_path.endswith(os.sep)) or (
not from_path.endswith(os.sep) and to_path.endswith(os.sep)):
print("One path has a trailing slash and the other doesn't. That's probably not right. Check your inputs.")
exit()
conn = sqlite3.connect('/data/navidrome.db')
main(conn, from_path, to_path)
conn.commit()
conn.close()
if __name__ == '__main__':
cli_entrypoint()

50
nmv.py Executable file
View File

@ -0,0 +1,50 @@
#!/bin/python3
import sqlite3
import uuid
from pathlib import Path
from contextlib import closing
import change_path
import shared
def main() -> None:
import argparse
parser = argparse.ArgumentParser(description='Merge tracks from the library into any of ones in new_dir')
parser.add_argument('--nbd', type=Path, default=Path('/data/navidrome.db'), help='Path to navidrome db')
parser.add_argument('new_dir', type=Path)
parser.add_argument('--exchange', action='store_true', default=False,
help='Should we exchange metadata of files or leave to die metadata of an old file')
args = parser.parse_args()
conn: sqlite3.Connection
with closing(sqlite3.connect(args.nbd)) as conn:
for file_path in args.new_dir.glob('**/*'):
print(f'Handling file {str(file_path)}')
current_song_response = change_path.get_media_by_path(conn, str(file_path))
if len(current_song_response) == 0:
continue
current_song = current_song_response[0]
search_result = shared.get_media_by_metadata(current_song.artist, current_song.track_name, conn)
if search_result is not None:
interaction = input(f'Do you want to move these files {str(search_result.path)!r} --> {str(file_path)!r} (Y/n)?')
if interaction.lower() in ('', 'y', 'yes'):
print(f"Performing {'exchange' if args.exchange else 'move'}")
tmp_name = uuid.uuid4().hex
# Firstly, we need to move target file to temp location
change_path.main(conn=conn, from_path=str(file_path), to_path=tmp_name)
# Secondly, move first file to place of target file
change_path.main(conn, from_path=str(search_result.path), to_path=str(file_path))
if args.exchange:
# Thirdly, optional, move target file to place of first file
change_path.main(conn, from_path=tmp_name, to_path=str(search_result.path))
conn.commit()
if __name__ == '__main__':
main()

67
shared.py Executable file
View File

@ -0,0 +1,67 @@
import sqlite3
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class Song:
id: str
artist: str
track_name: str
path: Path
def get_media_by_metadata(artist: str, track_name: str, conn: sqlite3.Connection) -> Song | None:
r = conn.execute(
'select id, title, artist, path from media_file where lower(title) = lower(?) and lower(artist) = lower(?);',
(track_name, artist)).fetchall()
results = list()
for res in r:
results.append(Song(*res))
if len(results) == 0:
return None
elif len(results) == 1:
return results[0]
else:
raise RuntimeError("Found multiple matches for search criteria" + str(results))
def path_clause(path: str):
if path.endswith('/'):
return 'LIKE', '%'
else:
return '=', ''
def get_media_by_path(conn: sqlite3.Connection, _path: str | Path) -> tuple[Song, ...]:
path = str(_path)
clause_and_suffix = path_clause(path)
clause = clause_and_suffix[0]
path = path + clause_and_suffix[1]
sql = f"""
SELECT id, title, artist, path FROM media_file
WHERE path {clause} ?
ORDER BY path;"""
return tuple(
Song(*res)
for res in conn.execute(sql, (path,)).fetchall()
)
# def get_track_metadata(path: Path) -> Song | None: # ffmpeg edition
# if not path.exists():
# raise RuntimeError(f"{str(path)!r} doesn't exists")
#
# with Popen(['ffprobe', '-show_format', '-print_format', 'json', str(path)], stdout=PIPE, stderr=PIPE) as p:
# stdout, stderr = p.communicate()
# data = json.loads(stdout)
# try:
# return Song(artist=data['format']['tags']['Artist'], track_name=data['format']['tags']['Title'], path=path)
#
# except Exception as e:
# print(f"Failed to get metadata for file {path.name}")
# raise e

39493
tests/database.sql Normal file

File diff suppressed because one or more lines are too long

55
tests/test_change_path.py Executable file
View File

@ -0,0 +1,55 @@
import sqlite3
import unittest
from pathlib import Path
import change_path
import shared
class TestChangePath(unittest.TestCase):
SAMPLE_TRACK_PATH_1 = '/music/music/Def Leppard/01 - Studio Albums/1980 - On Through The Night/03. Sorrow Is A Woman.mp3'
SAMPLE_TRACK_PATH_2 = '/music/music/Def Leppard/01 - Studio Albums/1980 - On Through The Night/05. Satellite.mp3'
NOTHING = '/nothing'
def setUp(self) -> None:
self.conn = sqlite3.connect(':memory:')
with open(Path(__file__).parent.resolve() / 'database.sql', 'r', encoding='utf-8') as dump:
self.conn.executescript(dump.read())
def test_move_file_into_existing_one(self):
"""
Make sure we can't move a track to path of another, existing in db, track as it would violate unique
constraint on media_file.id (given it is md5(file_path)
:return:
"""
with self.assertRaises(sqlite3.IntegrityError, msg='Expected a unique constraint violation') as context:
change_path.main(
self.conn,
from_path=self.SAMPLE_TRACK_PATH_1,
to_path=self.SAMPLE_TRACK_PATH_2
)
self.assertIn('UNIQUE constraint failed: media_file.id', context.exception.args)
self.conn.rollback()
def test_move_file_into_non_existing_one(self):
self.assertEqual(1, len(
shared.get_media_by_path(self.conn, self.SAMPLE_TRACK_PATH_1)
), 'expected to find one song in db by path')
change_path.main(
self.conn,
from_path=self.SAMPLE_TRACK_PATH_1,
to_path=self.NOTHING
)
self.conn.commit()
self.assertEqual(0, len(
shared.get_media_by_path(self.conn, self.SAMPLE_TRACK_PATH_1)
), 'expected to not find a song by old path')
self.assertEqual(1, len(
shared.get_media_by_path(self.conn, self.NOTHING)
), 'expected to find moved song by a new path')