import hashlib
import logging
import shutil
import threading
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Iterable, Optional, Sequence, Set, Tuple, cast
import peewee
from gi.repository import Gtk
from peewee import fn, prefetch
from sublime_music.adapters import api_objects as API
from .. import (
AlbumSearchQuery,
CacheMissError,
CachingAdapter,
ConfigParamDescriptor,
ConfigurationStore,
ConfigureServerForm,
SongCacheStatus,
UIInfo,
)
from . import models
KEYS = CachingAdapter.CachedDataKey
[docs]class FilesystemAdapter(CachingAdapter):
"""
Defines an adapter which retrieves its data from the local filesystem.
"""
# Configuration and Initialization Properties
# ==================================================================================
[docs] @staticmethod
def get_ui_info() -> UIInfo:
return UIInfo(
name="Local Filesystem",
description="Add a directory on your local filesystem",
icon_basename="folder-music",
)
[docs] @staticmethod
def migrate_configuration(config_store: ConfigurationStore):
pass
[docs] def __init__(self, config: dict, data_directory: Path, is_cache: bool = False):
self.data_directory = data_directory
self.cover_art_dir = self.data_directory.joinpath("cover_art")
self.music_dir = self.data_directory.joinpath("music")
self.cover_art_dir.mkdir(parents=True, exist_ok=True)
self.music_dir.mkdir(parents=True, exist_ok=True)
self.is_cache = is_cache
self.db_write_lock: threading.Lock = threading.Lock()
database_filename = data_directory.joinpath("cache.db")
models.database.init(database_filename)
models.database.connect()
with self.db_write_lock, models.database.atomic():
models.database.create_tables(models.ALL_TABLES)
self._migrate_db()
[docs] def initial_sync(self):
# TODO (#188) this is where scanning the fs should potentially happen?
pass
[docs] def shutdown(self):
logging.info("Shutdown complete")
# Database Migration
# ==================================================================================
def _migrate_db(self):
pass
# Usage and Availability Properties
# ==================================================================================
can_be_cached = False # Can't be cached (there's no need).
can_be_ground_truth = False # TODO (#188)
is_networked = False # Doesn't access the network.
[docs] def on_offline_mode_change(self, _: bool):
pass
# TODO (#200) make these dependent on cache state. Need to do this kinda efficiently
can_get_cover_art_uri = True
can_get_song_file_uri = True
can_get_song_details = True
can_get_song_rating = True
can_get_artist = True
can_get_albums = True
can_get_album = True
can_get_ignored_articles = True
can_get_directory = True
can_search = True
def _can_get_key(self, cache_key: CachingAdapter.CachedDataKey) -> bool:
if not self.is_cache:
return True
# As long as there's something in the cache (even if it's not valid) it may be
# returned in a cache miss error.
query = models.CacheInfo.select().where(models.CacheInfo.cache_key == cache_key)
return query.count() > 0
@property
def can_get_playlists(self) -> bool:
return self._can_get_key(KEYS.PLAYLISTS)
@property
def can_get_playlist_details(self) -> bool:
return self._can_get_key(KEYS.PLAYLIST_DETAILS)
@property
def can_get_artists(self) -> bool:
return self._can_get_key(KEYS.ARTISTS)
@property
def can_get_genres(self) -> bool:
return self._can_get_key(KEYS.GENRES)
@property
def can_set_song_rating(self) -> bool:
return self._can_get_key(KEYS.SONG_RATING)
supported_schemes = ("file",)
supported_artist_query_types = {
AlbumSearchQuery.Type.RANDOM,
AlbumSearchQuery.Type.NEWEST,
AlbumSearchQuery.Type.FREQUENT,
AlbumSearchQuery.Type.RECENT,
AlbumSearchQuery.Type.STARRED,
AlbumSearchQuery.Type.ALPHABETICAL_BY_NAME,
AlbumSearchQuery.Type.ALPHABETICAL_BY_ARTIST,
AlbumSearchQuery.Type.YEAR_RANGE,
AlbumSearchQuery.Type.GENRE,
}
# Data Helper Methods
# ==================================================================================
def _get_list(
self,
model: Any,
cache_key: CachingAdapter.CachedDataKey,
ignore_cache_miss: bool = False,
where_clauses: Tuple[Any, ...] | None = None,
order_by: Any = None,
) -> Sequence:
result = model.select()
if where_clauses is not None:
result = result.where(*where_clauses)
if order_by:
result = result.order_by(order_by)
if self.is_cache and not ignore_cache_miss:
# Determine if the adapter has ingested data for this key before, and if
# not, cache miss.
if not models.CacheInfo.get_or_none(
models.CacheInfo.valid == True, # noqa: 712
models.CacheInfo.cache_key == cache_key,
):
raise CacheMissError(partial_data=result)
return result
def _get_object_details(
self, model: Any, id: str, cache_key: CachingAdapter.CachedDataKey
) -> Any:
obj = model.get_or_none(model.id == id)
# Handle the case that this is the ground truth adapter.
if not self.is_cache:
if not obj:
raise Exception(f"{model} with id={id} does not exist")
return obj
# If we haven't ingested data for this item before, or it's been invalidated,
# raise a CacheMissError with the partial data.
cache_info = models.CacheInfo.get_or_none(
models.CacheInfo.cache_key == cache_key,
models.CacheInfo.parameter == id,
models.CacheInfo.valid == True, # noqa: 712
)
if not cache_info:
raise CacheMissError(partial_data=obj)
return obj
def _compute_song_filename(self, cache_info: models.CacheInfo) -> Path:
try:
if path_str := cache_info.path:
# Make sure that the path is somewhere in the cache directory and a
# malicious server (or MITM attacker) isn't trying to override files in
# other parts of the system.
path = self.music_dir.joinpath(str(path_str))
if self.music_dir in path.parents:
return path
except Exception:
pass
# Fall back to using the song file hash as the filename. This shouldn't happen
# with good servers, but just to be safe.
return self.music_dir.joinpath(str(cache_info.file_hash))
# Data Retrieval Methods
# ==================================================================================
[docs] def get_cached_statuses(self, song_ids: Sequence[str]) -> Dict[str, SongCacheStatus]:
def compute_song_cache_status(song: models.Song) -> SongCacheStatus:
try:
file = cast(models.CacheInfo, song.file)
if self._compute_song_filename(file).exists():
if file.valid:
if file.cache_permanently:
return SongCacheStatus.PERMANENTLY_CACHED
return SongCacheStatus.CACHED
# The file is on disk, but marked as stale.
return SongCacheStatus.CACHED_STALE
except Exception:
pass
return SongCacheStatus.NOT_CACHED
cached_statuses = {song_id: SongCacheStatus.NOT_CACHED for song_id in song_ids}
try:
file_models = models.CacheInfo.select().where(
models.CacheInfo.cache_key == KEYS.SONG_FILE
)
song_models = models.Song.select().where(models.Song.id.in_(song_ids))
cached_statuses.update(
{s.id: compute_song_cache_status(s) for s in prefetch(song_models, file_models)}
)
except Exception:
pass
return cached_statuses
_playlists = None
[docs] def get_playlists(self, ignore_cache_miss: bool = False) -> Sequence[API.Playlist]:
if self._playlists is not None:
return self._playlists
self._playlists = self._get_list(
models.Playlist,
CachingAdapter.CachedDataKey.PLAYLISTS,
ignore_cache_miss=ignore_cache_miss,
order_by=fn.LOWER(models.Playlist.name),
)
return self._playlists
[docs] def get_playlist_details(self, playlist_id: str) -> API.Playlist:
return self._get_object_details(
models.Playlist, playlist_id, CachingAdapter.CachedDataKey.PLAYLIST_DETAILS
)
[docs] def get_cover_art_uri(self, cover_art_id: str, scheme: str, size: int) -> str:
cover_art = models.CacheInfo.get_or_none(
models.CacheInfo.cache_key == CachingAdapter.CachedDataKey.COVER_ART_FILE,
models.CacheInfo.parameter == cover_art_id,
)
if cover_art:
filename = self.cover_art_dir.joinpath(str(cover_art.file_hash))
if filename.exists():
if cover_art.valid:
return str(filename)
else:
raise CacheMissError(partial_data=str(filename))
raise CacheMissError()
[docs] def get_song_file_uri(self, song_id: str, schemes: Iterable[str]) -> str:
song = models.Song.get_or_none(models.Song.id == song_id)
if not song:
if self.is_cache:
raise CacheMissError()
else:
raise Exception(f"Song {song_id} does not exist.")
try:
if (song_file := song.file) and (filename := self._compute_song_filename(song_file)):
if filename.exists():
file_uri = f"file://{filename}"
if song_file.valid:
return file_uri
else:
raise CacheMissError(partial_data=file_uri)
except peewee.DoesNotExist:
pass
raise CacheMissError()
[docs] def get_song_details(self, song_id: str) -> API.Song:
return self._get_object_details(
models.Song,
song_id,
CachingAdapter.CachedDataKey.SONG,
)
[docs] def get_artists(self, ignore_cache_miss: bool = False) -> Sequence[API.Artist]:
return self._get_list(
models.Artist,
CachingAdapter.CachedDataKey.ARTISTS,
ignore_cache_miss=ignore_cache_miss,
where_clauses=(~(models.Artist.id.startswith("invalid:")),),
)
[docs] def get_artist(self, artist_id: str) -> API.Artist:
return self._get_object_details(
models.Artist, artist_id, CachingAdapter.CachedDataKey.ARTIST
)
[docs] def get_albums(
self,
query: AlbumSearchQuery,
sort_direction: str = "ascending"
# TODO (#208) deal with sort dir here?
) -> Sequence[API.Album]:
strhash = query.strhash()
query_result = models.AlbumQueryResult.get_or_none(
models.AlbumQueryResult.query_hash == strhash
)
# If we've cached the query result, then just return it. If it's stale, then
# return the old value as a cache miss error.
if query_result and (
cache_info := models.CacheInfo.get_or_none(
models.CacheInfo.cache_key == CachingAdapter.CachedDataKey.ALBUMS,
models.CacheInfo.parameter == strhash,
)
):
if cache_info.valid:
return query_result.albums
else:
raise CacheMissError(partial_data=query_result.albums)
# If we haven't ever cached the query result, try to construct one, and return
# it as a CacheMissError result.
sql_query = models.Album.select().where(~(models.Album.id.startswith("invalid:")))
Type = AlbumSearchQuery.Type
if query.type == Type.GENRE:
assert query.genre
genre_name = genre.name if (genre := query.genre) else None
sql_query = {
Type.RANDOM: sql_query.order_by(fn.Random()),
Type.NEWEST: sql_query.order_by(models.Album.created.desc()),
Type.FREQUENT: sql_query.order_by(models.Album.play_count.desc()),
Type.STARRED: sql_query.where(models.Album.starred.is_null(False)).order_by(
models.Album.name
),
Type.ALPHABETICAL_BY_NAME: sql_query.order_by(models.Album.name),
Type.ALPHABETICAL_BY_ARTIST: sql_query.order_by(models.Album.artist.name),
Type.YEAR_RANGE: sql_query.where(
models.Album.year.between(*query.year_range)
).order_by(models.Album.year, models.Album.name),
Type.GENRE: sql_query.where(models.Album.genre == genre_name).order_by(
models.Album.name
),
}.get(query.type)
raise CacheMissError(partial_data=sql_query)
[docs] def get_all_albums(self) -> Sequence[API.Album]:
return self._get_list(
models.Album,
CachingAdapter.CachedDataKey.ALBUMS,
ignore_cache_miss=True,
where_clauses=(
~(models.Album.id.startswith("invalid:")),
models.Album.artist.is_null(False),
),
)
[docs] def get_album(self, album_id: str) -> API.Album:
return self._get_object_details(models.Album, album_id, CachingAdapter.CachedDataKey.ALBUM)
[docs] def get_ignored_articles(self) -> Set[str]:
return {
i.name
for i in self._get_list(
models.IgnoredArticle, CachingAdapter.CachedDataKey.IGNORED_ARTICLES
)
}
[docs] def get_directory(self, directory_id: str) -> API.Directory:
return self._get_object_details(
models.Directory, directory_id, CachingAdapter.CachedDataKey.DIRECTORY
)
[docs] def get_genres(self) -> Sequence[API.Genre]:
return self._get_list(models.Genre, CachingAdapter.CachedDataKey.GENRES)
[docs] def search(self, query: str) -> API.SearchResult:
search_result = API.SearchResult(query)
search_result.add_results("albums", self.get_all_albums())
search_result.add_results("artists", self.get_artists(ignore_cache_miss=True))
search_result.add_results(
"songs",
self._get_list(
models.Song,
CachingAdapter.CachedDataKey.SONG,
ignore_cache_miss=True,
where_clauses=(models.Song.artist.is_null(False),),
),
)
search_result.add_results(
"playlists",
self.get_playlists(ignore_cache_miss=True),
)
return search_result
# Data Ingestion Methods
# ==================================================================================
def _strhash(self, string: str) -> str:
return hashlib.sha1(bytes(string, "utf8")).hexdigest()
[docs] def ingest_new_data(
self,
data_key: CachingAdapter.CachedDataKey,
param: Optional[str],
data: Any,
):
assert self.is_cache, "FilesystemAdapter is not in cache mode!"
# Wrap the actual ingestion function in a database lock, and an atomic
# transaction.
with self.db_write_lock, models.database.atomic():
self._do_ingest_new_data(data_key, param, data)
[docs] def invalidate_data(self, key: CachingAdapter.CachedDataKey, param: Optional[str]):
assert self.is_cache, "FilesystemAdapter is not in cache mode!"
# Wrap the actual ingestion function in a database lock, and an atomic
# transaction.
with self.db_write_lock, models.database.atomic():
self._do_invalidate_data(key, param)
[docs] def delete_data(self, key: CachingAdapter.CachedDataKey, param: Optional[str]):
assert self.is_cache, "FilesystemAdapter is not in cache mode!"
# Wrap the actual ingestion function in a database lock, and an atomic
# transaction.
with self.db_write_lock, models.database.atomic():
self._do_delete_data(key, param)
def _do_ingest_new_data(
self,
data_key: CachingAdapter.CachedDataKey,
param: Optional[str],
data: Any,
partial: bool = False,
) -> Any:
# TODO (#201): this entire function is not exactly efficient due to the nested
# dependencies and everything. I'm not sure how to improve it, and I'm not sure
# if it needs improving at this point.
logging.debug(f"_do_ingest_new_data param={param} data_key={data_key} data={data}")
def getattrs(obj: Any, keys: Iterable[str]) -> Dict[str, Any]:
return {k: getattr(obj, k) for k in keys}
def setattrs(obj: Any, data: Dict[str, Any]):
for k, v in data.items():
if v is not None:
setattr(obj, k, v)
def compute_file_hash(filename: str) -> str:
file_hash = hashlib.sha1()
with open(filename, "rb") as f:
while chunk := f.read(8192):
file_hash.update(chunk)
return file_hash.hexdigest()
return_val = None
# Set the cache info.
now = datetime.now()
cache_info, cache_info_created = models.CacheInfo.get_or_create(
cache_key=(
# In the case of SONG_FILE_PERMANENT, we have to use SONG_FILE as the
# key in the database so everything matches up when querying.
data_key
if data_key != KEYS.SONG_FILE_PERMANENT
else KEYS.SONG_FILE
),
parameter=param,
defaults={
"cache_key": data_key,
"parameter": param,
"last_ingestion_time": now,
# If it's partial data, then set it to be invalid so it will only be
# used in the event that the ground truth adapter can't service the
# request.
"valid": not partial,
},
)
if not cache_info_created:
cache_info.valid = cache_info.valid or not partial
cache_info.last_ingestion_time = now
cache_info.save()
if data_key == KEYS.ALBUM:
album = cast(API.Album, data)
album_id = album.id or f"invalid:{self._strhash(album.name)}"
album_data = {
"id": album_id,
**getattrs(
album,
[
"name",
"created",
"duration",
"play_count",
"song_count",
"starred",
"year",
],
),
"genre": (
self._do_ingest_new_data(KEYS.GENRE, None, g) if (g := album.genre) else None
),
"artist": (
self._do_ingest_new_data(KEYS.ARTIST, ar.id, ar, partial=True)
if (ar := album.artist)
else None
),
"_songs": (
[self._do_ingest_new_data(KEYS.SONG, s.id, s) for s in album.songs or []]
if not partial
else None
),
"_cover_art": (
self._do_ingest_new_data(KEYS.COVER_ART_FILE, album.cover_art, data=None)
if album.cover_art
else None
),
}
db_album, created = models.Album.get_or_create(id=album_id, defaults=album_data)
if not created:
setattrs(db_album, album_data)
db_album.save()
return_val = db_album
elif data_key == KEYS.ALBUMS:
albums = [self._do_ingest_new_data(KEYS.ALBUM, a.id, a, partial=True) for a in data]
album_query_result, created = models.AlbumQueryResult.get_or_create(
query_hash=param, defaults={"query_hash": param, "albums": albums}
)
if not created:
album_query_result.albums = albums
try:
album_query_result.save()
except ValueError:
# No save necessary.
pass
elif data_key == KEYS.ARTIST:
# Ingest similar artists.
artist = cast(API.Artist, data)
if artist.similar_artists:
models.SimilarArtist.delete().where(
models.SimilarArtist.similar_artist.not_in(
[sa.id for sa in artist.similar_artists or []]
),
models.Artist == artist.id,
).execute()
models.SimilarArtist.insert_many(
[
{"artist": artist.id, "similar_artist": a.id, "order": i}
for i, a in enumerate(artist.similar_artists or [])
]
).on_conflict_replace().execute()
artist_id = artist.id or f"invalid:{self._strhash(artist.name)}"
artist_data = {
"id": artist_id,
**getattrs(
artist,
[
"name",
"album_count",
"starred",
"biography",
"music_brainz_id",
"last_fm_url",
],
),
"albums": [
self._do_ingest_new_data(KEYS.ALBUM, a.id, a, partial=True)
for a in artist.albums or []
],
"_artist_image_url": (
self._do_ingest_new_data(
KEYS.COVER_ART_FILE, artist.artist_image_url, data=None
)
if artist.artist_image_url
else None
),
}
db_artist, created = models.Artist.get_or_create(id=artist_id, defaults=artist_data)
if not created:
setattrs(db_artist, artist_data)
db_artist.save()
return_val = db_artist
elif data_key == KEYS.ARTISTS:
for a in data:
self._do_ingest_new_data(KEYS.ARTIST, a.id, a, partial=True)
models.Artist.delete().where(
models.Artist.id.not_in([a.id for a in data])
& ~models.Artist.id.startswith("invalid")
).execute()
elif data_key == KEYS.COVER_ART_FILE:
cache_info.file_id = param
if data is not None:
file_hash = compute_file_hash(data)
cache_info.file_hash = file_hash
# Copy the actual cover art file
shutil.copy(str(data), str(self.cover_art_dir.joinpath(file_hash)))
elif data_key == KEYS.DIRECTORY:
api_directory = cast(API.Directory, data)
directory_data: Dict[str, Any] = getattrs(api_directory, ["id", "name", "parent_id"])
if not partial:
directory_data["directory_children"] = []
directory_data["song_children"] = []
for c in api_directory.children:
if hasattr(c, "children"): # directory
directory_data["directory_children"].append(
self._do_ingest_new_data(KEYS.DIRECTORY, c.id, c, partial=True)
)
else:
directory_data["song_children"].append(
self._do_ingest_new_data(KEYS.SONG, c.id, c)
)
directory, created = models.Directory.get_or_create(
id=api_directory.id, defaults=directory_data
)
if not created:
setattrs(directory, directory_data)
directory.save()
return_val = directory
elif data_key == KEYS.GENRES:
for g in data:
self._do_ingest_new_data(KEYS.GENRE, None, g)
elif data_key == KEYS.GENRE:
api_genre = cast(API.Genre, data)
genre_data = getattrs(api_genre, ["name", "song_count", "album_count"])
genre, created = models.Genre.get_or_create(name=api_genre.name, defaults=genre_data)
if not created:
setattrs(genre, genre_data)
genre.save()
return_val = genre
elif data_key == KEYS.IGNORED_ARTICLES:
models.IgnoredArticle.insert_many(
{"name": s} for s in data
).on_conflict_replace().execute()
models.IgnoredArticle.delete().where(models.IgnoredArticle.name.not_in(data)).execute()
elif data_key == KEYS.PLAYLIST_DETAILS:
api_playlist = cast(API.Playlist, data)
playlist_data: Dict[str, Any] = {
**getattrs(
api_playlist,
[
"id",
"name",
"song_count",
"duration",
"created",
"changed",
"comment",
"owner",
"public",
],
),
"_cover_art": (
self._do_ingest_new_data(KEYS.COVER_ART_FILE, api_playlist.cover_art, None)
if api_playlist.cover_art
else None
),
}
if not partial:
# If it's partial, then don't ingest the songs.
playlist_data["_songs"] = [
self._do_ingest_new_data(KEYS.SONG, s.id, s) for s in api_playlist.songs
]
playlist, playlist_created = models.Playlist.get_or_create(
id=playlist_data["id"], defaults=playlist_data
)
# Update the values if the playlist already existed.
if not playlist_created:
setattrs(playlist, playlist_data)
playlist.save()
return_val = playlist
elif data_key == KEYS.PLAYLISTS:
self._playlists = None
for p in data:
self._do_ingest_new_data(KEYS.PLAYLIST_DETAILS, p.id, p, partial=True)
models.Playlist.delete().where(
models.Playlist.id.not_in([p.id for p in data])
).execute()
elif data_key == KEYS.SEARCH_RESULTS:
data = cast(API.SearchResult, data)
for a in data._artists.values():
self._do_ingest_new_data(KEYS.ARTIST, a.id, a, partial=True)
for a in data._albums.values():
self._do_ingest_new_data(KEYS.ALBUM, a.id, a, partial=True)
for s in data._songs.values():
self._do_ingest_new_data(KEYS.SONG, s.id, s, partial=True)
for p in data._playlists.values():
self._do_ingest_new_data(KEYS.PLAYLIST_DETAILS, p.id, p, partial=True)
elif data_key == KEYS.SONG:
api_song = cast(API.Song, data)
song_data = getattrs(
api_song,
[
"id",
"title",
"track",
"year",
"duration",
"parent_id",
"disc_number",
"user_rating",
],
)
song_data["genre"] = (
self._do_ingest_new_data(KEYS.GENRE, None, g) if (g := api_song.genre) else None
)
song_data["artist"] = (
self._do_ingest_new_data(KEYS.ARTIST, ar.id, ar, partial=True)
if (ar := api_song.artist)
else None
)
song_data["album"] = (
self._do_ingest_new_data(KEYS.ALBUM, al.id, al, partial=True)
if (al := api_song.album)
else None
)
song_data["_cover_art"] = (
self._do_ingest_new_data(
KEYS.COVER_ART_FILE,
api_song.cover_art,
data=None,
)
if api_song.cover_art
else None
)
song_data["file"] = (
self._do_ingest_new_data(
KEYS.SONG_FILE,
api_song.id,
data=(api_song.path, None, api_song.size),
)
if api_song.path
else None
)
song, created = models.Song.get_or_create(id=song_data["id"], defaults=song_data)
if not created:
setattrs(song, song_data)
song.save()
return_val = song
elif data_key == KEYS.SONG_FILE:
cache_info.file_id = param
elif data_key == KEYS.SONG_FILE_PERMANENT:
cache_info.cache_permanently = True
# Special handling for Song
if data_key == KEYS.SONG_FILE and data:
path, buffer_filename, size = data
if path:
cache_info.path = path
if size:
cache_info.size = size
if buffer_filename:
cache_info.file_hash = compute_file_hash(buffer_filename)
# Copy the actual song file from the download buffer dir to the cache
# dir.
filename = self._compute_song_filename(cache_info)
filename.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(buffer_filename), str(filename))
elif data_key == KEYS.SONG_RATING:
song = models.Song.get_by_id(param)
song.user_rating = data
song.save()
cache_info.save()
return return_val if return_val is not None else cache_info
def _do_invalidate_data(
self,
data_key: CachingAdapter.CachedDataKey,
param: Optional[str],
):
logging.debug(f"_do_invalidate_data param={param} data_key={data_key}")
models.CacheInfo.update({"valid": False}).where(
models.CacheInfo.cache_key == data_key, models.CacheInfo.parameter == param
).execute()
if data_key == KEYS.ALBUM:
# Invalidate the corresponding cover art.
if album := models.Album.get_or_none(models.Album.id == param):
self._do_invalidate_data(KEYS.COVER_ART_FILE, album.cover_art)
elif data_key == KEYS.ARTIST:
# Invalidate the corresponding cover art and albums.
if artist := models.Artist.get_or_none(models.Artist.id == param):
self._do_invalidate_data(KEYS.COVER_ART_FILE, artist.artist_image_url)
for album in artist.albums or []:
self._do_invalidate_data(CachingAdapter.CachedDataKey.ALBUM, album.id)
elif data_key == KEYS.PLAYLIST_DETAILS:
# Invalidate the corresponding cover art.
if playlist := models.Playlist.get_or_none(models.Playlist.id == param):
self._do_invalidate_data(KEYS.COVER_ART_FILE, playlist.cover_art)
elif data_key == KEYS.SONG_FILE:
# Invalidate the corresponding cover art.
if song := models.Song.get_or_none(models.Song.id == param):
self._do_invalidate_data(KEYS.COVER_ART_FILE, song.cover_art)
def _do_delete_data(self, data_key: CachingAdapter.CachedDataKey, param: Optional[str]):
logging.debug(f"_do_delete_data param={param} data_key={data_key}")
cache_info = models.CacheInfo.get_or_none(
models.CacheInfo.cache_key == data_key,
models.CacheInfo.parameter == param,
)
if data_key == KEYS.COVER_ART_FILE:
if cache_info:
self.cover_art_dir.joinpath(str(cache_info.file_hash)).unlink(missing_ok=True)
elif data_key == KEYS.PLAYLIST_DETAILS:
# Delete the playlist and corresponding cover art.
if playlist := models.Playlist.get_or_none(models.Playlist.id == param):
if cover_art := playlist.cover_art:
self._do_delete_data(KEYS.COVER_ART_FILE, cover_art)
playlist.delete_instance()
elif data_key == KEYS.SONG_FILE:
if cache_info:
self._compute_song_filename(cache_info).unlink(missing_ok=True)
elif data_key == KEYS.ALL_SONGS:
shutil.rmtree(str(self.music_dir))
shutil.rmtree(str(self.cover_art_dir))
self.music_dir.mkdir(parents=True, exist_ok=True)
self.cover_art_dir.mkdir(parents=True, exist_ok=True)
models.CacheInfo.update({"valid": False}).where(
models.CacheInfo.cache_key == KEYS.SONG_FILE
).execute()
models.CacheInfo.update({"valid": False}).where(
models.CacheInfo.cache_key == KEYS.COVER_ART_FILE
).execute()
elif data_key == KEYS.EVERYTHING:
self._do_delete_data(KEYS.ALL_SONGS, None)
for table in models.ALL_TABLES:
table.truncate_table()
if cache_info:
cache_info.valid = False
cache_info.save()