Source code for sublime_music.dbus.manager

import functools
import logging
import re
from collections import defaultdict
from datetime import timedelta
from typing import Any, Callable, DefaultDict, Dict, List, Match, Optional, Tuple

from deepdiff import DeepDiff
from gi.repository import Gio, GLib

from ..adapters import AdapterManager, CacheMissError
from ..config import AppConfiguration
from ..players import PlayerManager
from ..ui.state import RepeatType
from ..util import resolve_path


[docs]def dbus_propagate(param_self: Any = None) -> Callable: """Wraps a function which causes changes to DBus properties.""" def decorator(function: Callable) -> Callable: @functools.wraps(function) def wrapper(*args): function(*args) if (param_self or args[0]).dbus_manager: (param_self or args[0]).dbus_manager.property_diff() return wrapper return decorator
[docs]class DBusManager: second_microsecond_conversion = 1000000 current_state: Dict = {}
[docs] def __init__( self, connection: Gio.DBusConnection, do_on_method_call: Callable[ [ Gio.DBusConnection, str, str, str, str, GLib.Variant, Gio.DBusMethodInvocation, ], None, ], on_set_property: Callable[[Gio.DBusConnection, str, str, str, str, GLib.Variant], None], get_config_and_player_manager: Callable[ [], Tuple[AppConfiguration, Optional[PlayerManager]] ], ): self.get_config_and_player_manager = get_config_and_player_manager self.do_on_method_call = do_on_method_call self.on_set_property = on_set_property self.connection = connection def dbus_bus_acquired(connection: Gio.DBusConnection, name: str): specs = [ "org.mpris.MediaPlayer2.xml", "org.mpris.MediaPlayer2.Player.xml", "org.mpris.MediaPlayer2.Playlists.xml", "org.mpris.MediaPlayer2.TrackList.xml", ] for spec in specs: spec_path = resolve_path("dbus/mpris_specs", spec) with open(spec_path) as f: node_info = Gio.DBusNodeInfo.new_for_xml(f.read()) connection.register_object( "/org/mpris/MediaPlayer2", node_info.interfaces[0], self.on_method_call, self.on_get_property, self.on_set_property, ) # TODO (#127): I have no idea what to do here. def dbus_name_lost(*args): pass self.bus_number = Gio.bus_own_name( Gio.BusType.SESSION, "org.mpris.MediaPlayer2.sublimemusic", Gio.BusNameOwnerFlags.NONE, dbus_bus_acquired, None, dbus_name_lost, )
[docs] def shutdown(self): logging.info("DBusManager is shutting down.") self.property_diff() Gio.bus_unown_name(self.bus_number)
[docs] def on_get_property( self, connection: Gio.DBusConnection, sender: str, path: str, interface: str, property_name: str, ) -> GLib.Variant: value = self.property_dict().get(interface, {}).get(property_name) return DBusManager.to_variant(value)
[docs] def on_method_call( self, connection: Gio.DBusConnection, sender: str, path: str, interface: str, method: str, params: GLib.Variant, invocation: Gio.DBusMethodInvocation, ): # TODO (#127): I don't really know if this works. if interface == "org.freedesktop.DBus.Properties": if method == "Get": invocation.return_value( self.on_get_property(connection, sender, path, interface, *params) ) elif method == "Set": self.on_set_property(connection, sender, path, interface, *params) elif method == "GetAll": all_properties = { k: DBusManager.to_variant(v) for k, v in self.property_dict()[interface].items() } invocation.return_value(GLib.Variant("(a{sv})", (all_properties,))) return self.do_on_method_call( connection, sender, path, interface, method, params, invocation, )
[docs] @staticmethod def to_variant(value: Any) -> GLib.Variant: if callable(value): return DBusManager.to_variant(value()) if isinstance(value, GLib.Variant): return value if type(value) == tuple: return GLib.Variant(*value) if type(value) == dict: return GLib.Variant( "a{sv}", {k: DBusManager.to_variant(v) for k, v in value.items()}, ) variant_type = {list: "as", str: "s", int: "i", float: "d", bool: "b"}.get(type(value)) if not variant_type: return value return GLib.Variant(variant_type, value)
_escape_re = re.compile(r"[^\w]+") @staticmethod @functools.lru_cache(maxsize=1024) def _escape_id(id: str) -> str: """ Escapes an ID for use in a DBus object identifier. >>> DBusManager._escape_id("tr-1843") 'tr_0x45_1843' >>> DBusManager._escape_id("bc9c7726-8739-4add-8df0-88c6233f37df") 'bc9c7726_0x45_8739_0x45_4add_0x45_8df0_0x45_88c6233f37df' >>> DBusManager._escape_id("spaces spaces everywhere") 'spaces_0x32_spaces_0x32_everywhere' >>> DBusManager._escape_id("already/has/slashes") 'already_0x47_has_0x47_slashes' """ def replace(m: Match[str]) -> str: return f"_0x{ord(m[0])}_" return DBusManager._escape_re.sub(replace, id)
[docs] def property_dict(self) -> Dict[str, dict[str, Any]]: config, player_manager = self.get_config_and_player_manager() if config is None or player_manager is None: return {} state = config.state has_current_song = state.current_song is not None has_next_song = False if state.repeat_type in (RepeatType.REPEAT_QUEUE, RepeatType.REPEAT_SONG): has_next_song = True elif has_current_song: has_next_song = state.current_song_index < len(state.play_queue) - 1 active_playlist = self.get_active_playlist(state.active_playlist_id) playlist_count = 0 try: get_playlists_result = AdapterManager.get_playlists(allow_download=False) if get_playlists_result.data_is_available: playlist_count = len(get_playlists_result.result()) except Exception: pass playback_status_map: dict[tuple[bool, bool], str] = { (False, False): "Stopped", (False, True): "Stopped", (True, False): "Paused", (True, True): "Playing", } return { "org.mpris.MediaPlayer2": { "CanQuit": True, "CanRaise": True, "HasTrackList": True, "Identity": "Sublime Music", "DesktopEntry": "sublime-music", "SupportedUriSchemes": [], "SupportedMimeTypes": [], }, "org.mpris.MediaPlayer2.Player": { "PlaybackStatus": playback_status_map[player_manager.song_loaded, state.playing], "LoopStatus": state.repeat_type.as_mpris_loop_status(), "Rate": 1.0, "Shuffle": state.shuffle_on, "Metadata": self.get_mpris_metadata( state.current_song_index, state.play_queue, ) if state.current_song else {}, "Volume": 0.0 if state.is_muted else state.volume / 100, "Position": ( "x", int( max(state.song_progress.total_seconds(), 0) * self.second_microsecond_conversion ), ), "MinimumRate": 1.0, "MaximumRate": 1.0, "CanGoNext": has_current_song and has_next_song, "CanGoPrevious": has_current_song, "CanPlay": True, "CanPause": True, "CanSeek": True, "CanControl": True, }, "org.mpris.MediaPlayer2.TrackList": { "Tracks": DBusManager.get_dbus_playlist(state.play_queue), "CanEditTracks": False, }, "org.mpris.MediaPlayer2.Playlists": { "PlaylistCount": playlist_count, "Orderings": ["Alphabetical", "Created", "Modified"], "ActivePlaylist": ("(b(oss))", active_playlist), }, }
[docs] @functools.lru_cache(maxsize=10) def get_active_playlist(self, active_playlist_id: Optional[str]) -> Tuple[bool, GLib.Variant]: if not active_playlist_id or not AdapterManager.can_get_playlist_details(): return (False, GLib.Variant("(oss)", ("/", "", ""))) try: playlist = AdapterManager.get_playlist_details( active_playlist_id, allow_download=False ).result() try: cover_art = AdapterManager.get_cover_art_uri( playlist.cover_art, "file", allow_download=False ).result() except CacheMissError: cover_art = "" return ( True, GLib.Variant( "(oss)", ( "/playlist/" + DBusManager._escape_id(playlist.id), playlist.name, cover_art, ), ), ) except Exception: logging.exception("Couldn't get playlist details") return (False, GLib.Variant("(oss)", ("/", "", "")))
[docs] @functools.lru_cache(maxsize=10) def get_mpris_metadata(self, idx: int, play_queue: Tuple[str, ...]) -> Dict[str, Any]: try: song = AdapterManager.get_song_details(play_queue[idx], allow_download=False).result() except Exception: return {} trackid = DBusManager.get_dbus_playlist(play_queue)[idx] duration = ( "x", int( (song.duration or timedelta(0)).total_seconds() * self.second_microsecond_conversion ), ) try: cover_art = AdapterManager.get_cover_art_uri( song.cover_art, "file", allow_download=False ).result() cover_art = "file://" + cover_art except CacheMissError: cover_art = "" artist_name = song.artist.name if song.artist else "" return { "mpris:trackid": trackid, "mpris:length": duration, # Art URIs should be sent as (UTF-8) strings. # Local files should use the "file://" schema. "mpris:artUrl": cover_art, # TODO (#71) use walrus once MYPY isn't retarded "xesam:album": (song.album.name if song.album else ""), "xesam:albumArtist": [artist_name], "xesam:artist": [artist_name], "xesam:title": song.title, }
[docs] @staticmethod @functools.lru_cache(maxsize=20) def get_dbus_playlist(play_queue: Tuple[str, ...]) -> List[str]: """ Gets a playlist formatted for DBus. If multiples of the same element exist in the queue, it will use ``/0`` after the song ID to differentiate between the instances. >>> DBusManager.get_dbus_playlist(("2", "1", "3", "1")) ['/song/2/0', '/song/1/0', '/song/3/0', '/song/1/1'] """ seen_counts: DefaultDict[str, int] = defaultdict(int) tracks = [] for song_id in play_queue: num_seen = seen_counts[song_id] tracks.append(f"/song/{DBusManager._escape_id(song_id)}/{num_seen}") seen_counts[song_id] += 1 return tracks
diff_parse_re = re.compile(r"root\['(.*?)'\]\['(.*?)'\](?:\[.*\])?")
[docs] def property_diff(self): new_property_dict = self.property_dict() diff = DeepDiff(self.current_state, new_property_dict) changes: dict[str, dict[str, Any]] = defaultdict(dict) for path, change in diff.get("values_changed", {}).items(): if m := self.diff_parse_re.match(path): interface, property_name = m.groups() changes[interface][property_name] = change["new_value"] else: logging.warning(f"Couldn't parse path {path} for diff") if diff.get("dictionary_item_added"): changes = new_property_dict for interface, changed_props in changes.items(): # If the metadata has changed, just make the entire Metadata object # part of the update. if "Metadata" in changed_props.keys(): changed_props["Metadata"] = new_property_dict[interface]["Metadata"] # Special handling for when the position changes (a seek). # Technically, I'm sending this signal too often, but I don't think # it really matters. if interface == "org.mpris.MediaPlayer2.Player" and "Position" in changed_props: self.connection.emit_signal( None, "/org/mpris/MediaPlayer2", interface, "Seeked", GLib.Variant("(x)", (changed_props["Position"][1],)), ) # Do not emit the property change. del changed_props["Position"] # Special handling for when the track list changes. # Technically, I'm supposed to use `TrackAdded` and `TrackRemoved` # signals when minor changes occur, but the docs also say that: # # > It is left up to the implementation to decide when a change to # > the track list is invasive enough that this signal should be # > emitted instead of a series of TrackAdded and TrackRemoved # > signals. # # So I think that any change is invasive enough that I should use # this signal. if interface == "org.mpris.MediaPlayer2.TrackList" and "Tracks" in changed_props: track_list = changed_props["Tracks"] if len(track_list) > 0: current_track = new_property_dict["org.mpris.MediaPlayer2.Player"][ "Metadata" ].get("mpris:trackid", track_list[0]) self.connection.emit_signal( None, "/org/mpris/MediaPlayer2", interface, "TrackListReplaced", GLib.Variant("(aoo)", (track_list, current_track)), ) self.connection.emit_signal( None, "/org/mpris/MediaPlayer2", "org.freedesktop.DBus.Properties", "PropertiesChanged", GLib.Variant( "(sa{sv}as)", ( interface, {k: DBusManager.to_variant(v) for k, v in changed_props.items()}, [], ), ), ) # Update state for next diff. self.current_state = new_property_dict