import logging
import os
import random
import shutil
import sys
from concurrent.futures import Future
from datetime import timedelta
from functools import partial
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple
from urllib.parse import urlparse
import bleach
try:
import osxmmkeys
tap_imported = True
except Exception:
tap_imported = False
from gi.repository import Gdk, GdkPixbuf, Gio, GLib, Gtk
try:
import gi
gi.require_version("Notify", "0.7")
from gi.repository import Notify # mypy: ignore
glib_notify_exists = True
except Exception:
# I really don't care what kind of exception it is, all that matters is the
# import failed for some reason.
logging.warning("Unable to import Notify from GLib. Notifications will be disabled.")
glib_notify_exists = False
from .adapters import (
AdapterManager,
AlbumSearchQuery,
CacheMissError,
DownloadProgress,
Result,
SongCacheStatus,
)
from .adapters.api_objects import Playlist, PlayQueue, Song
from .config import AppConfiguration, ProviderConfiguration
from .dbus import DBusManager, dbus_propagate
from .players import PlayerDeviceEvent, PlayerEvent, PlayerManager
from .ui.configure_provider import ConfigureProviderDialog
from .ui.main import MainWindow
from .ui.state import RepeatType, UIState
from .util import resolve_path
[docs]class SublimeMusicApp(Gtk.Application):
[docs] def __init__(self, config_file: Path):
super().__init__(application_id="app.sublimemusic.SublimeMusic")
if glib_notify_exists:
Notify.init("Sublime Music")
self.window: Optional[Gtk.Window] = None
self.app_config = AppConfiguration.load_from_file(config_file)
self.dbus_manager: Optional[DBusManager] = None
self.connect("shutdown", self.on_app_shutdown)
player_manager: PlayerManager
exiting: bool = False
[docs] def do_startup(self):
Gtk.Application.do_startup(self)
def add_action(name: str, fn: Callable, parameter_type: str | None = None):
"""Registers an action with the application."""
if type(parameter_type) == str:
parameter_type = GLib.VariantType(parameter_type)
action = Gio.SimpleAction.new(name, parameter_type)
action.connect("activate", fn)
self.add_action(action)
# Add action for menu items.
add_action("add-new-music-provider", self.on_add_new_music_provider)
add_action("edit-current-music-provider", self.on_edit_current_music_provider)
add_action("switch-music-provider", self.on_switch_music_provider, parameter_type="s")
add_action("remove-music-provider", self.on_remove_music_provider, parameter_type="s")
# Add actions for player controls
add_action("play-pause", self.on_play_pause)
add_action("next-track", self.on_next_track)
add_action("prev-track", self.on_prev_track)
add_action("repeat-press", self.on_repeat_press)
add_action("shuffle-press", self.on_shuffle_press)
# Navigation actions.
add_action("play-next", self.on_play_next, parameter_type="as")
add_action("add-to-queue", self.on_add_to_queue, parameter_type="as")
add_action("go-to-album", self.on_go_to_album, parameter_type="s")
add_action("go-to-artist", self.on_go_to_artist, parameter_type="s")
add_action("browse-to", self.browse_to, parameter_type="s")
add_action("go-to-playlist", self.on_go_to_playlist, parameter_type="s")
add_action("go-online", self.on_go_online)
add_action("refresh-devices", self.on_refresh_devices)
add_action(
"refresh-window",
lambda *a: self.on_refresh_window(None, {}, True),
)
add_action("mute-toggle", self.on_mute_toggle)
add_action(
"update-play-queue-from-server",
lambda a, p: self.update_play_state_from_server(),
)
if tap_imported:
self.tap = osxmmkeys.Tap()
self.tap.on("play_pause", self.on_play_pause)
self.tap.on("next_track", self.on_next_track)
self.tap.on("prev_track", self.on_prev_track)
self.tap.start()
[docs] def do_activate(self):
# We only allow a single window and raise any existing ones
if self.window:
self.window.present()
return
# Load the state for the server, if it exists.
self.app_config.load_state()
# If there is no current provider, use the first one if there are any
# configured, and if none are configured, then show the dialog to create a new
# one.
if self.app_config.provider is None:
if len(self.app_config.providers) == 0:
self.show_configure_servers_dialog()
# If they didn't add one with the dialog, close the window.
if len(self.app_config.providers) == 0:
if self.window:
self.window.close()
return
AdapterManager.reset(self.app_config, self.on_song_download_progress)
# Configure Icons
default_icon_theme = Gtk.IconTheme.get_default()
for adapter in AdapterManager.available_adapters:
if icon_dir := adapter.get_ui_info().icon_dir:
default_icon_theme.append_search_path(str(icon_dir))
icon_dirs = [resolve_path("ui/icons"), resolve_path("adapters/icons")]
for icon_dir in icon_dirs:
default_icon_theme.append_search_path(str(icon_dir))
# Windows are associated with the application when the last one is
# closed the application shuts down.
self.window = MainWindow(application=self, title="Sublime Music")
# Configure the CSS provider so that we can style elements on the
# window.
css_provider = Gtk.CssProvider()
css_provider.load_from_path(str(resolve_path("ui/app_styles.css")))
context = Gtk.StyleContext()
screen = Gdk.Screen.get_default()
context.add_provider_for_screen(screen, css_provider, Gtk.STYLE_PROVIDER_PRIORITY_USER)
self.window.show_all()
self.window.present()
# Connect after we know there's a server configured.
self.window.stack.connect("notify::visible-child", self.on_stack_change)
self.window.connect("song-clicked", self.on_song_clicked)
self.window.connect("songs-removed", self.on_songs_removed)
self.window.connect("refresh-window", self.on_refresh_window)
self.window.connect("notification-closed", self.on_notification_closed)
self.window.connect("go-to", self.on_window_go_to)
self.window.connect("key-press-event", self.on_window_key_press)
self.window.player_controls.connect("song-scrub", self.on_song_scrub)
self.window.player_controls.connect("device-update", self.on_device_update)
self.window.player_controls.connect("volume-change", self.on_volume_change)
self.window.player_controls.connect("song-rated", self.on_current_song_rated)
# Configure the players
self.last_play_queue_update = timedelta(0)
self.loading_state = False
self.should_scrobble_song = False
def on_timepos_change(value: Optional[float]):
if self.loading_state or not self.window or not self.app_config.state.current_song:
return
if value is None:
self.last_play_queue_update = timedelta(0)
return
self.app_config.state.song_progress = timedelta(seconds=value)
GLib.idle_add(
self.window.player_controls.update_scrubber,
self.app_config.state.song_progress,
self.app_config.state.current_song.duration,
self.app_config.state.song_stream_cache_progress,
)
if (self.last_play_queue_update + timedelta(15)).total_seconds() <= value:
self.save_play_queue()
if value > 5 and self.should_scrobble_song and AdapterManager.can_scrobble_song():
AdapterManager.scrobble_song(self.app_config.state.current_song)
self.should_scrobble_song = False
def on_track_end():
at_end = self.app_config.state.next_song_index is None
no_repeat = self.app_config.state.repeat_type == RepeatType.NO_REPEAT
if at_end and no_repeat:
self.app_config.state.playing = False
self.app_config.state.current_song_index = -1
self.update_window()
return
GLib.idle_add(self.on_next_track)
def on_player_event(event: PlayerEvent):
if event.type == PlayerEvent.EventType.PLAY_STATE_CHANGE:
assert event.playing is not None
self.app_config.state.playing = event.playing
if self.dbus_manager:
self.dbus_manager.property_diff()
self.update_window()
elif event.type == PlayerEvent.EventType.VOLUME_CHANGE:
assert event.volume is not None
self.app_config.state.volume = event.volume
if self.dbus_manager:
self.dbus_manager.property_diff()
self.update_window()
elif event.type == PlayerEvent.EventType.STREAM_CACHE_PROGRESS_CHANGE:
if (
self.loading_state
or not self.window
or not self.app_config.state.current_song
or event.stream_cache_duration is None
):
return
self.app_config.state.song_stream_cache_progress = timedelta(
seconds=event.stream_cache_duration
)
GLib.idle_add(
self.window.player_controls.update_scrubber,
self.app_config.state.song_progress,
self.app_config.state.current_song.duration,
self.app_config.state.song_stream_cache_progress,
)
elif event.type == PlayerEvent.EventType.DISCONNECT:
self.app_config.state.current_device = "this device"
self.player_manager.set_current_device_id(self.app_config.state.current_device)
self.player_manager.set_volume(self.app_config.state.volume)
self.update_window()
def player_device_change_callback(event: PlayerDeviceEvent):
state_device = self.app_config.state.current_device
if event.delta == PlayerDeviceEvent.Delta.ADD:
# If the device added is the one that's supposed to be active, activate
# it and set the volume.
if event.id == state_device:
self.player_manager.set_current_device_id(self.app_config.state.current_device)
self.player_manager.set_volume(self.app_config.state.volume)
self.app_config.state.connected_device_name = event.name
self.app_config.state.connecting_to_device = False
self.app_config.state.available_players[event.player_type].add(
(event.id, event.name)
)
elif event.delta == PlayerDeviceEvent.Delta.REMOVE:
if state_device == event.id:
self.player_manager.pause()
self.app_config.state.available_players[event.player_type].remove(
(event.id, event.name)
)
self.update_window()
self.app_config.state.connecting_to_device = True
def check_if_connected():
if not self.app_config.state.connecting_to_device:
return
self.app_config.state.current_device = "this device"
self.app_config.state.connecting_to_device = False
self.player_manager.set_current_device_id(self.app_config.state.current_device)
self.update_window()
self.player_manager = PlayerManager(
on_timepos_change,
on_track_end,
on_player_event,
lambda *a: GLib.idle_add(player_device_change_callback, *a),
self.app_config.player_config,
)
GLib.timeout_add(10000, check_if_connected)
# Update after Adapter Initial Sync
def after_initial_sync(_):
self.update_window()
# Prompt to load the play queue from the server.
if AdapterManager.can_get_play_queue():
self.update_play_state_from_server(prompt_confirm=True)
# Get the playlists, just so that we don't have tons of cache misses from
# DBus trying to get the playlists.
if AdapterManager.can_get_playlists():
AdapterManager.get_playlists()
inital_sync_result = AdapterManager.initial_sync()
inital_sync_result.add_done_callback(after_initial_sync)
# Send out to the bus that we exist.
if self.dbus_manager:
self.dbus_manager.property_diff()
# ########## DBUS MANAGMENT ########## #
[docs] def do_dbus_register(self, connection: Gio.DBusConnection, path: str) -> bool:
self.dbus_manager = DBusManager(
connection,
self.on_dbus_method_call,
self.on_dbus_set_property,
lambda: (self.app_config, self.player_manager),
)
return True
[docs] def on_dbus_method_call(
self,
connection: Gio.DBusConnection,
sender: str,
path: str,
interface: str,
method: str,
params: GLib.Variant,
invocation: Gio.DBusMethodInvocation,
):
def seek_fn(offset: float):
if not self.app_config.state.current_song:
return
new_seconds = self.app_config.state.song_progress + timedelta(microseconds=offset)
# This should not ever happen. The current_song should always have
# a duration, but the Child object has `duration` optional because
# it could be a directory.
assert self.app_config.state.current_song.duration is not None
self.on_song_scrub(
None,
(
new_seconds.total_seconds()
/ self.app_config.state.current_song.duration.total_seconds()
)
* 100,
)
def set_pos_fn(track_id: str, position: float = 0):
if self.app_config.state.playing:
self.on_play_pause()
pos_seconds = timedelta(microseconds=position)
self.app_config.state.song_progress = pos_seconds
track_id, occurrence = track_id.split("/")[-2:]
# Find the (N-1)th time that the track id shows up in the list. (N
# is the -*** suffix on the track id.)
song_index = [
i for i, x in enumerate(self.app_config.state.play_queue) if x == track_id
][int(occurrence) or 0]
self.play_song(song_index)
def get_tracks_metadata(track_ids: List[str]) -> GLib.Variant:
if not self.dbus_manager:
return
if len(track_ids) == 0:
# We are lucky, just return an empty list.
return GLib.Variant("(aa{sv})", ([],))
# Have to calculate all of the metadatas so that we can deal with
# repeat song IDs.
metadatas: Iterable[Any] = [
self.dbus_manager.get_mpris_metadata(
i,
self.app_config.state.play_queue,
)
for i in range(len(self.app_config.state.play_queue))
]
# Get rid of all of the tracks that were not requested.
metadatas = list(filter(lambda m: m["mpris:trackid"] in track_ids, metadatas))
assert len(metadatas) == len(track_ids)
# Sort them so they get returned in the same order as they were
# requested.
metadatas = sorted(metadatas, key=lambda m: track_ids.index(m["mpris:trackid"]))
# Turn them into dictionaries that can actually be serialized into
# a GLib.Variant.
metadatas = [{k: DBusManager.to_variant(v) for k, v in m.items()} for m in metadatas]
return GLib.Variant("(aa{sv})", (metadatas,))
def activate_playlist(playlist_id: str):
playlist_id = playlist_id.split("/")[-1]
playlist = AdapterManager.get_playlist_details(playlist_id).result()
# Calculate the song id to play.
song_idx = 0
if self.app_config.state.shuffle_on:
song_idx = random.randint(0, len(playlist.songs) - 1)
self.on_song_clicked(
None,
song_idx,
tuple(s.id for s in playlist.songs),
{"active_playlist_id": playlist_id},
)
def get_playlists(
index: int,
max_count: int,
order: str,
reverse_order: bool,
) -> GLib.Variant:
playlists_result = AdapterManager.get_playlists()
if not playlists_result.data_is_available:
# We don't want to wait for the response in this case, so just
# return an empty array.
return GLib.Variant("(a(oss))", ([],))
playlists = list(playlists_result.result())
sorters = {
"Alphabetical": lambda p: p.name,
"Created": lambda p: p.created,
"Modified": lambda p: p.changed,
}
if order in sorters:
playlists.sort(
key=sorters.get(order, lambda p: p),
reverse=reverse_order,
)
def make_playlist_tuple(p: Playlist) -> GLib.Variant:
cover_art_filename = AdapterManager.get_cover_art_uri(
p.cover_art,
"file",
allow_download=False,
).result()
return (f"/playlist/{p.id}", p.name, cover_art_filename or "")
return GLib.Variant(
"(a(oss))",
([make_playlist_tuple(p) for p in playlists[index : (index + max_count)]],),
)
def play():
if not self.app_config.state.playing:
self.on_play_pause()
def pause():
if self.app_config.state.playing:
self.on_play_pause()
method_call_map: Dict[str, Dict[str, Any]] = {
"org.mpris.MediaPlayer2": {
"Raise": self.window and self.window.present,
"Quit": self.window and self.window.destroy,
},
"org.mpris.MediaPlayer2.Player": {
"Next": self.on_next_track,
"Previous": self.on_prev_track,
"Pause": pause,
"PlayPause": self.on_play_pause,
"Stop": pause,
"Play": play,
"Seek": seek_fn,
"SetPosition": set_pos_fn,
},
"org.mpris.MediaPlayer2.TrackList": {
"GoTo": set_pos_fn,
"GetTracksMetadata": get_tracks_metadata,
# 'RemoveTrack': remove_track,
},
"org.mpris.MediaPlayer2.Playlists": {
"ActivatePlaylist": activate_playlist,
"GetPlaylists": get_playlists,
},
}
method_fn = method_call_map.get(interface, {}).get(method)
if method_fn is None:
logging.warning(f"Unknown/unimplemented method: {interface}.{method}.")
invocation.return_value(method_fn(*params) if callable(method_fn) else None)
[docs] def on_dbus_set_property(
self,
connection: Gio.DBusConnection,
sender: str,
path: str,
interface: str,
property_name: str,
value: GLib.Variant,
):
def change_loop(new_loop_status: GLib.Variant):
self.app_config.state.repeat_type = RepeatType.from_mpris_loop_status(
new_loop_status.get_string()
)
self.update_window()
def set_shuffle(new_val: GLib.Variant):
if new_val.get_boolean() != self.app_config.state.shuffle_on:
self.on_shuffle_press(None, None)
def set_volume(new_val: GLib.Variant):
self.on_volume_change(None, new_val.get_double() * 100)
setter_map: Dict[str, Dict[str, Any]] = {
"org.mpris.MediaPlayer2.Player": {
"LoopStatus": change_loop,
"Rate": lambda _: None,
"Shuffle": set_shuffle,
"Volume": set_volume,
}
}
setter = setter_map.get(interface, {}).get(property_name)
if setter is None:
logging.warning("Set: Unknown property: {property_name}.")
return
if callable(setter):
setter(value)
# ########## ACTION HANDLERS ########## #
[docs] @dbus_propagate()
def on_refresh_window(self, _, state_updates: Dict[str, Any], force: bool = False):
if settings := state_updates.get("__settings__"):
for k, v in settings.items():
setattr(self.app_config, k, v)
if (offline_mode := settings.get("offline_mode")) is not None:
AdapterManager.on_offline_mode_change(offline_mode)
del state_updates["__settings__"]
self.app_config.save()
if player_setting := state_updates.get("__player_setting__"):
player_name, option_name, value = player_setting
self.app_config.player_config[player_name][option_name] = value
del state_updates["__player_setting__"]
if pm := self.player_manager:
pm.change_settings(self.app_config.player_config)
self.app_config.save()
for k, v in state_updates.items():
setattr(self.app_config.state, k, v)
self.update_window(force=force)
[docs] def on_notification_closed(self, _):
self.app_config.state.current_notification = None
self.update_window()
[docs] def on_add_new_music_provider(self, *args):
self.show_configure_servers_dialog()
[docs] def on_edit_current_music_provider(self, *args):
assert self.app_config.provider
self.show_configure_servers_dialog(self.app_config.provider.clone())
[docs] def on_switch_music_provider(self, _, provider_id: GLib.Variant):
if self.app_config.state.playing:
self.on_play_pause()
self.app_config.save()
self.app_config.current_provider_id = provider_id.get_string()
self.reset_state()
self.app_config.save()
[docs] def on_remove_music_provider(self, _, provider_id: GLib.Variant):
provider = self.app_config.providers[provider_id.get_string()]
confirm_dialog = Gtk.MessageDialog(
transient_for=self.window,
message_type=Gtk.MessageType.WARNING,
buttons=(
Gtk.STOCK_CANCEL,
Gtk.ResponseType.CANCEL,
Gtk.STOCK_DELETE,
Gtk.ResponseType.YES,
),
text=f"Are you sure you want to delete the {provider.name} music provider?",
)
confirm_dialog.format_secondary_markup(
"Deleting this music provider will delete all cached songs and metadata "
"associated with this provider."
)
if confirm_dialog.run() == Gtk.ResponseType.YES:
assert self.app_config.cache_location
provider_dir = self.app_config.cache_location.joinpath(provider.id)
shutil.rmtree(str(provider_dir), ignore_errors=True)
del self.app_config.providers[provider.id]
confirm_dialog.destroy()
[docs] def on_window_go_to(self, win: Any, action: str, value: str):
{
"album": self.on_go_to_album,
"artist": self.on_go_to_artist,
"playlist": self.on_go_to_playlist,
}[action](None, GLib.Variant("s", value))
[docs] @dbus_propagate()
def on_play_pause(self, *args):
if self.app_config.state.current_song_index < 0:
return
self.app_config.state.playing = not self.app_config.state.playing
if self.player_manager.song_loaded and (
self.player_manager.current_song == self.app_config.state.current_song
):
self.player_manager.toggle_play()
self.save_play_queue()
else:
# This is from a restart, start playing the file.
self.play_song(self.app_config.state.current_song_index)
self.update_window()
[docs] def on_next_track(self, *args):
if self.app_config.state.current_song is None:
# This may happen due to DBUS, ignore.
return
song_index_to_play = self.app_config.state.next_song_index
if song_index_to_play is None:
# We may end up here due to D-Bus.
return
self.app_config.state.current_song_index = song_index_to_play
self.app_config.state.song_progress = timedelta(0)
if self.app_config.state.playing:
self.play_song(song_index_to_play, reset=True)
else:
self.update_window()
[docs] def on_prev_track(self, *args):
if self.app_config.state.current_song is None:
# This may happen due to DBUS, ignore.
return
# Go back to the beginning of the song if we are past 5 seconds.
# Otherwise, go to the previous song.
no_repeat = self.app_config.state.repeat_type == RepeatType.NO_REPEAT
if self.app_config.state.repeat_type == RepeatType.REPEAT_SONG:
song_index_to_play = self.app_config.state.current_song_index
elif self.app_config.state.song_progress.total_seconds() < 5:
if self.app_config.state.current_song_index == 0 and no_repeat:
song_index_to_play = 0
else:
song_index_to_play = (self.app_config.state.current_song_index - 1) % len(
self.app_config.state.play_queue
)
else:
# Go back to the beginning of the song.
song_index_to_play = self.app_config.state.current_song_index
self.app_config.state.current_song_index = song_index_to_play
self.app_config.state.song_progress = timedelta(0)
if self.player_manager.playing:
self.play_song(
song_index_to_play,
reset=True,
# search backwards for a song to play if offline
playable_song_search_direction=-1,
)
else:
self.update_window()
[docs] @dbus_propagate()
def on_repeat_press(self, *args):
# Cycle through the repeat types.
new_repeat_type = RepeatType((self.app_config.state.repeat_type.value + 1) % 3)
self.app_config.state.repeat_type = new_repeat_type
self.update_window()
[docs] @dbus_propagate()
def on_shuffle_press(self, *args):
if self.app_config.state.shuffle_on:
# Revert to the old play queue.
old_play_queue_copy = self.app_config.state.old_play_queue
self.app_config.state.current_song_index = (
old_play_queue_copy.index(self.app_config.state.current_song.id)
if self.app_config.state.current_song
else 0
)
self.app_config.state.play_queue = old_play_queue_copy
else:
self.app_config.state.old_play_queue = self.app_config.state.play_queue
mutable_play_queue = list(self.app_config.state.play_queue)
# Remove the current song, then shuffle and put the song back.
song_id = (
self.app_config.state.current_song.id
if self.app_config.state.current_song
else None
)
del mutable_play_queue[self.app_config.state.current_song_index]
random.shuffle(mutable_play_queue)
if song_id:
self.app_config.state.play_queue = (song_id,) + tuple(mutable_play_queue)
else:
self.app_config.state.play_queue = tuple(mutable_play_queue)
self.app_config.state.current_song_index = 0
self.app_config.state.shuffle_on = not self.app_config.state.shuffle_on
self.update_window()
[docs] @dbus_propagate()
def on_play_next(self, action: Any, song_ids: GLib.Variant):
song_ids = tuple(song_ids)
if self.app_config.state.current_song is None:
insert_at = 0
else:
insert_at = self.app_config.state.current_song_index + 1
self.app_config.state.play_queue = (
self.app_config.state.play_queue[:insert_at]
+ song_ids
+ self.app_config.state.play_queue[insert_at:]
)
self.app_config.state.old_play_queue += song_ids
self.update_window()
[docs] @dbus_propagate()
def on_add_to_queue(self, action: Any, song_ids: GLib.Variant):
song_ids = tuple(song_ids)
self.app_config.state.play_queue += tuple(song_ids)
self.app_config.state.old_play_queue += tuple(song_ids)
self.update_window()
[docs] def on_go_to_album(self, action: Any, album_id: GLib.Variant):
# Switch to the Alphabetical by Name view to guarantee that the album is there.
self.app_config.state.current_album_search_query = AlbumSearchQuery(
AlbumSearchQuery.Type.ALPHABETICAL_BY_NAME,
genre=self.app_config.state.current_album_search_query.genre,
year_range=self.app_config.state.current_album_search_query.year_range,
)
self.app_config.state.current_tab = "albums"
self.app_config.state.selected_album_id = album_id.get_string()
self.update_window()
[docs] def on_go_to_artist(self, action: Any, artist_id: GLib.Variant):
self.app_config.state.current_tab = "artists"
self.app_config.state.selected_artist_id = artist_id.get_string()
self.update_window()
[docs] def browse_to(self, action: Any, item_id: GLib.Variant):
self.app_config.state.current_tab = "browse"
self.app_config.state.selected_browse_element_id = item_id.get_string()
self.update_window()
[docs] def on_go_to_playlist(self, action: Any, playlist_id: GLib.Variant):
self.app_config.state.current_tab = "playlists"
self.app_config.state.selected_playlist_id = playlist_id.get_string()
self.update_window()
[docs] def on_go_online(self, *args):
self.on_refresh_window(None, {"__settings__": {"offline_mode": False}})
[docs] def on_refresh_devices(self, *args):
self.player_manager.refresh_players()
[docs] def reset_state(self):
if self.app_config.state.playing:
self.on_play_pause()
self.loading_state = True
self.player_manager.reset()
AdapterManager.reset(self.app_config, self.on_song_download_progress)
self.loading_state = False
# Update the window according to the new server configuration.
self.update_window()
[docs] def on_stack_change(self, stack: Gtk.Stack, _):
self.app_config.state.current_tab = stack.get_visible_child_name()
self.update_window()
[docs] def on_song_clicked(
self,
win: Any,
song_index: int,
song_queue: Tuple[str, ...],
metadata: Dict[str, Any],
):
song_queue = tuple(song_queue)
# Reset the play queue so that we don't ever revert back to the
# previous one.
old_play_queue = song_queue
if (force_shuffle := metadata.get("force_shuffle_state")) is not None:
self.app_config.state.shuffle_on = force_shuffle
self.app_config.state.active_playlist_id = metadata.get("active_playlist_id")
# If shuffle is enabled, then shuffle the playlist.
if self.app_config.state.shuffle_on and not metadata.get("no_reshuffle"):
song_id = song_queue[song_index]
song_queue_list = list(song_queue[:song_index] + song_queue[song_index + 1 :])
random.shuffle(song_queue_list)
song_queue = (song_id, *song_queue_list)
song_index = 0
self.play_song(
song_index,
reset=True,
old_play_queue=old_play_queue,
play_queue=song_queue,
)
[docs] def on_songs_removed(self, win: Any, song_indexes_to_remove: List[int]):
self.app_config.state.play_queue = tuple(
song_id
for i, song_id in enumerate(self.app_config.state.play_queue)
if i not in song_indexes_to_remove
)
# Determine how many songs before the currently playing one were also
# deleted.
before_current = [
i for i in song_indexes_to_remove if i < self.app_config.state.current_song_index
]
if self.app_config.state.current_song_index in song_indexes_to_remove:
if len(self.app_config.state.play_queue) == 0:
self.on_play_pause()
self.app_config.state.current_song_index = -1
self.update_window()
return
self.app_config.state.current_song_index -= len(before_current)
self.play_song(self.app_config.state.current_song_index, reset=True)
else:
self.app_config.state.current_song_index -= len(before_current)
self.update_window()
self.save_play_queue()
[docs] @dbus_propagate()
def on_song_scrub(self, _, scrub_value: float):
if not self.app_config.state.current_song or not self.window:
return
# This should not ever happen. The current_song should always have
# a duration, but the Child object has `duration` optional because
# it could be a directory.
assert self.app_config.state.current_song.duration is not None
new_time = self.app_config.state.current_song.duration * (scrub_value / 100)
self.app_config.state.song_progress = new_time
self.window.player_controls.update_scrubber(
self.app_config.state.song_progress,
self.app_config.state.current_song.duration,
self.app_config.state.song_stream_cache_progress,
)
# If already playing, then make the player itself seek.
if self.player_manager and self.player_manager.song_loaded:
self.player_manager.seek(new_time)
self.save_play_queue()
[docs] def on_current_song_rated(self, _):
if not self.window:
return
rating = self.window.player_controls.rating_buttons_box.rating
current_song = self.app_config.state.current_song
if not current_song:
return
def on_done(future: Future):
"""Update the UI after a failed or successful rating"""
if future.cancelled():
return
exception = future.exception(timeout=1.0)
if exception:
self.app_config.state.current_notification = UIState.UINotification(
markup="<b>Unable to rate song.</b>",
icon="dialog-error",
)
self.update_window()
elif self.window:
self.window.player_controls.update_rating(rating)
current_song.user_rating = rating
AdapterManager.set_song_rating(current_song, rating).add_done_callback(on_done)
[docs] def on_device_update(self, _, device_id: str):
if device_id == self.app_config.state.current_device:
return
self.app_config.state.current_device = device_id
if was_playing := self.app_config.state.playing:
self.on_play_pause()
self.player_manager.set_current_device_id(self.app_config.state.current_device)
if self.dbus_manager:
self.dbus_manager.property_diff()
self.update_window()
if was_playing:
self.on_play_pause()
if self.dbus_manager:
self.dbus_manager.property_diff()
[docs] @dbus_propagate()
def on_mute_toggle(self, *args):
self.app_config.state.is_muted = not self.app_config.state.is_muted
self.player_manager.set_muted(self.app_config.state.is_muted)
self.update_window()
[docs] @dbus_propagate()
def on_volume_change(self, _, value: float):
self.app_config.state.volume = value
self.player_manager.set_volume(self.app_config.state.volume)
self.update_window()
[docs] def on_window_key_press(self, window: Gtk.Window, event: Gdk.EventKey) -> bool:
# Need to use bitwise & here to see if CTRL is pressed.
if event.keyval == 102 and event.state & Gdk.ModifierType.CONTROL_MASK:
# Ctrl + F
window.search_entry.grab_focus()
return False
if event.keyval == 113 and event.state & Gdk.ModifierType.CONTROL_MASK:
# Ctrl + Q
window.destroy()
return False
# Allow spaces to work in the text entry boxes.
if (
window.search_entry.has_focus()
or window.playlists_panel.playlist_list.new_playlist_entry.has_focus()
):
return False
# Spacebar, home/prev
keymap = {
32: self.on_play_pause,
65360: self.on_prev_track,
65367: self.on_next_track,
}
action = keymap.get(event.keyval)
if action:
action()
return True
return False
[docs] def on_song_download_progress(self, song_id: str, progress: DownloadProgress):
assert self.window
GLib.idle_add(self.window.update_song_download_progress, song_id, progress)
[docs] def on_app_shutdown(self, app: "SublimeMusicApp"):
self.exiting = True
if glib_notify_exists:
Notify.uninit()
if tap_imported and self.tap:
self.tap.stop()
if self.app_config.provider is None:
return
if self.player_manager:
if self.app_config.state.playing:
self.save_play_queue()
self.player_manager.pause()
self.player_manager.shutdown()
self.app_config.save()
if self.dbus_manager:
self.dbus_manager.shutdown()
AdapterManager.shutdown()
# ########## HELPER METHODS ########## #
[docs] def update_window(self, force: bool = False):
if not self.window:
return
logging.info(f"Updating window force={force}")
GLib.idle_add(
lambda: self.window.update(self.app_config, self.player_manager, force=force)
)
[docs] def update_play_state_from_server(self, prompt_confirm: bool = False):
# TODO (#129): need to make the play queue list loading for the duration here if
# prompt_confirm is False.
if not prompt_confirm and self.app_config.state.playing:
self.player_manager.pause()
self.app_config.state.playing = False
self.app_config.state.loading_play_queue = True
self.update_window()
def do_update(f: Result[PlayQueue]):
play_queue = f.result()
if not play_queue:
self.app_config.state.loading_play_queue = False
return
play_queue.position = play_queue.position or timedelta(0)
new_play_queue = tuple(s.id for s in play_queue.songs)
new_song_progress = play_queue.position
def do_resume(clear_notification: bool):
if was_playing := self.app_config.state.playing:
self.on_play_pause()
self.app_config.state.play_queue = new_play_queue
self.app_config.state.song_progress = play_queue.position
self.app_config.state.current_song_index = play_queue.current_index or 0
self.app_config.state.loading_play_queue = False
self.player_manager.reset()
if clear_notification:
self.app_config.state.current_notification = None
self.update_window()
if was_playing:
self.on_play_pause()
if prompt_confirm:
# If there's not a significant enough difference in the song state,
# don't prompt.
progress_diff = 15.0
if self.app_config.state.song_progress:
progress_diff = abs(
(self.app_config.state.song_progress - new_song_progress).total_seconds()
)
if (
self.app_config.state.play_queue == new_play_queue
and self.app_config.state.current_song
):
song_index = self.app_config.state.current_song_index
if song_index == play_queue.current_index and progress_diff < 15:
return
# Show a notification to resume the play queue.
resume_text = "Do you want to resume the play queue"
if play_queue.changed_by or play_queue.changed:
resume_text += " saved"
if play_queue.changed_by:
resume_text += f" by {play_queue.changed_by}"
if play_queue.changed:
changed_str = play_queue.changed.astimezone(tz=None).strftime(
"%H:%M on %Y-%m-%d"
)
resume_text += f" at {changed_str}"
resume_text += "?"
self.app_config.state.current_notification = UIState.UINotification(
markup=f"<b>{resume_text}</b>",
actions=(("Resume", partial(do_resume, True)),),
)
self.update_window()
else: # just resume the play queue immediately
do_resume(False)
play_queue_future = AdapterManager.get_play_queue()
play_queue_future.add_done_callback(lambda f: GLib.idle_add(do_update, f))
song_playing_order_token = 0
batch_download_jobs: Set[Result] = set()
[docs] def play_song(
self,
song_index: int,
reset: bool = False,
old_play_queue: Tuple[str, ...] | None = None,
play_queue: Tuple[str, ...] | None = None,
playable_song_search_direction: int = 1,
):
def do_reset():
self.player_manager.reset()
self.app_config.state.song_progress = timedelta(0)
self.should_scrobble_song = True
# Tell the player that the next song is available for gapless playback
def do_notify_next_song(next_song: Song):
try:
next_uri = AdapterManager.get_song_file_uri(next_song)
if self.player_manager:
self.player_manager.next_media_cached(next_uri, next_song)
except CacheMissError:
logging.debug("Couldn't find the file for next song for gapless playback")
# Do this the old fashioned way so that we can have access to ``reset``
# in the callback.
@dbus_propagate(self)
def do_play_song(order_token: int, song: Song):
if order_token != self.song_playing_order_token:
return
uri = None
try:
if "file" in self.player_manager.supported_schemes:
uri = AdapterManager.get_song_file_uri(song)
except CacheMissError:
logging.debug("Couldn't find the file, will attempt to stream.")
if not uri:
try:
uri = AdapterManager.get_song_stream_uri(song)
except Exception:
pass
if not uri or urlparse(uri).scheme not in self.player_manager.supported_schemes:
self.app_config.state.current_notification = UIState.UINotification(
markup=f"<b>Unable to play {song.title}.</b>",
icon="dialog-error",
)
return
# Prevent it from doing the thing where it continually loads
# songs when it has to download.
if reset:
do_reset()
# Start playing the song.
if order_token != self.song_playing_order_token:
return
self.player_manager.play_media(
uri,
timedelta(0) if reset else self.app_config.state.song_progress,
song,
)
self.app_config.state.playing = True
self.update_window()
# Check if the next song is available in the cache
if (next_song_index := self.app_config.state.next_song_index) is not None:
next_song_details_future = AdapterManager.get_song_details(
self.app_config.state.play_queue[next_song_index]
)
next_song_details_future.add_done_callback(
lambda f: GLib.idle_add(do_notify_next_song, f.result()),
)
# Show a song play notification.
if self.app_config.song_play_notification:
try:
if glib_notify_exists:
notification_lines = []
if album := song.album:
notification_lines.append(f"<i>{album.name}</i>")
if artist := song.artist:
notification_lines.append(artist.name)
song_notification = Notify.Notification.new(
song.title,
bleach.clean("\n".join(notification_lines)),
)
song_notification.add_action(
"clicked",
"Open Sublime Music",
lambda *a: self.window.present() if self.window else None,
)
song_notification.show()
def on_cover_art_download_complete(cover_art_filename: str):
if order_token != self.song_playing_order_token:
return
# Add the image to the notification, and re-show
# the notification.
song_notification.set_image_from_pixbuf(
GdkPixbuf.Pixbuf.new_from_file_at_scale(
cover_art_filename, 70, 70, True
)
)
song_notification.show()
cover_art_result = AdapterManager.get_cover_art_uri(song.cover_art, "file")
cover_art_result.add_done_callback(
lambda f: on_cover_art_download_complete(f.result())
)
if sys.platform == "darwin":
notification_lines = []
if album := song.album:
notification_lines.append(album.name)
if artist := song.artist:
notification_lines.append(artist.name)
notification_text = "\n".join(notification_lines)
osascript_command = [
"display",
"notification",
f'"{notification_text}"',
"with",
"title",
f'"{song.title}"',
]
os.system(f"osascript -e '{' '.join(osascript_command)}'")
except Exception:
logging.warning(
"Unable to display notification. Is a notification daemon running?" # noqa: E501
)
# Download current song and prefetch songs. Only do this if the adapter can
# download songs and allow_song_downloads is True and download_on_stream is
# True.
def on_song_download_complete(song_id: str):
if order_token != self.song_playing_order_token:
return
# Hotswap to the downloaded song.
if (
# TODO (#182) allow hotswap if not playing. This requires being able
# to replace the currently playing URI with something different.
self.app_config.state.playing
and self.app_config.state.current_song
and self.app_config.state.current_song.id == song_id
):
# Switch to the local media if the player can hotswap without lag.
# For example, MPV can is barely noticable whereas there's quite a
# delay with Chromecast.
if self.player_manager.can_start_playing_with_no_latency:
self.player_manager.play_media(
AdapterManager.get_song_file_uri(song),
self.app_config.state.song_progress,
song,
)
# Handle case where a next-song was previously not cached
# but is now available for the player to use
if self.app_config.state.playing:
next_song_index = self.app_config.state.next_song_index
if (
next_song_index is not None
and self.app_config.state.play_queue[next_song_index] == song_id
):
next_song_details_future = AdapterManager.get_song_details(song_id)
next_song_details_future.add_done_callback(
lambda f: GLib.idle_add(do_notify_next_song, f.result()),
)
# Always update the window
self.update_window()
if (
# This only makes sense if the adapter is networked.
AdapterManager.ground_truth_adapter_is_networked()
# Don't download in offline mode.
and not self.app_config.offline_mode
and self.app_config.allow_song_downloads
and self.app_config.download_on_stream
and AdapterManager.can_batch_download_songs()
):
song_ids = [song.id]
# Add the prefetch songs.
if (repeat_type := self.app_config.state.repeat_type) != RepeatType.REPEAT_SONG:
song_idx = self.app_config.state.play_queue.index(song.id)
is_repeat_queue = RepeatType.REPEAT_QUEUE == repeat_type
prefetch_idxs = []
for i in range(self.app_config.prefetch_amount):
prefetch_idx: int = song_idx + 1 + i
play_queue_len: int = len(self.app_config.state.play_queue)
if is_repeat_queue or prefetch_idx < play_queue_len:
prefetch_idxs.append(prefetch_idx % play_queue_len) # noqa: S001
song_ids.extend([self.app_config.state.play_queue[i] for i in prefetch_idxs])
self.batch_download_jobs.add(
AdapterManager.batch_download_songs(
song_ids,
before_download=lambda _: self.update_window(),
on_song_download_complete=on_song_download_complete,
one_at_a_time=True,
delay=5,
)
)
if old_play_queue:
self.app_config.state.old_play_queue = old_play_queue
if play_queue:
self.app_config.state.play_queue = play_queue
self.app_config.state.current_song_index = song_index
for job in self.batch_download_jobs:
job.cancel()
self.song_playing_order_token += 1
if play_queue:
GLib.timeout_add(
5000,
partial(
self.save_play_queue,
song_playing_order_token=self.song_playing_order_token,
),
)
# If in offline mode, go to the first song in the play queue after the given
# song that is actually playable.
if self.app_config.offline_mode:
statuses = AdapterManager.get_cached_statuses(self.app_config.state.play_queue)
playable_statuses = (
SongCacheStatus.CACHED,
SongCacheStatus.PERMANENTLY_CACHED,
)
can_play = False
current_song_index = self.app_config.state.current_song_index
if statuses[current_song_index] in playable_statuses:
can_play = True
elif self.app_config.state.repeat_type != RepeatType.REPEAT_SONG:
# See if any other songs in the queue are playable.
play_queue_len = len(self.app_config.state.play_queue)
cursor = (current_song_index + playable_song_search_direction) % play_queue_len
for _ in range(play_queue_len): # Don't infinite loop.
if self.app_config.state.repeat_type == RepeatType.NO_REPEAT:
if (
playable_song_search_direction == 1 and cursor < current_song_index
) or (
playable_song_search_direction == -1 and cursor > current_song_index
):
# We wrapped around to the end of the play queue without
# finding a song that can be played, and we aren't allowed
# to loop back.
break
# If we find a playable song, stop and play it.
if statuses[cursor] in playable_statuses:
self.play_song(cursor, reset)
return
cursor = (cursor + playable_song_search_direction) % play_queue_len
if not can_play:
# There are no songs that can be played. Show a notification that you
# have to go online to play anything and then don't go further.
if was_playing := self.app_config.state.playing:
self.on_play_pause()
def go_online_clicked():
self.app_config.state.current_notification = None
self.on_go_online()
if was_playing:
self.on_play_pause()
if all(s == SongCacheStatus.NOT_CACHED for s in statuses):
markup = (
"<b>None of the songs in your play queue are cached for "
"offline playback.</b>\nGo online to start playing your queue."
)
else:
markup = (
"<b>None of the remaining songs in your play queue are cached "
"for offline playback.</b>\nGo online to contiue playing your "
"queue."
)
self.app_config.state.current_notification = UIState.UINotification(
icon="cloud-offline-symbolic",
markup=markup,
actions=(("Go Online", go_online_clicked),),
)
if reset:
do_reset()
self.update_window()
return
song_details_future = AdapterManager.get_song_details(
self.app_config.state.play_queue[self.app_config.state.current_song_index]
)
if song_details_future.data_is_available:
song_details_future.add_done_callback(
lambda f: do_play_song(self.song_playing_order_token, f.result())
)
else:
song_details_future.add_done_callback(
lambda f: GLib.idle_add(
partial(do_play_song, self.song_playing_order_token), f.result()
),
)
[docs] def save_play_queue(self, song_playing_order_token: int | None = None):
if (
len(self.app_config.state.play_queue) == 0
or self.app_config.provider is None
or (
song_playing_order_token
and song_playing_order_token != self.song_playing_order_token
)
):
return
position = self.app_config.state.song_progress
self.last_play_queue_update = position or timedelta(0)
if AdapterManager.can_save_play_queue() and self.app_config.state.current_song:
AdapterManager.save_play_queue(
song_ids=self.app_config.state.play_queue,
current_song_index=self.app_config.state.current_song_index,
position=position,
)