import functools
import logging
import re
from collections import defaultdict
from datetime import timedelta
from typing import Any, Callable, DefaultDict, Dict, List, Match, Optional, Tuple
from deepdiff import DeepDiff
from gi.repository import Gio, GLib
from ..adapters import AdapterManager, CacheMissError
from ..config import AppConfiguration
from ..players import PlayerManager
from ..ui.state import RepeatType
from ..util import resolve_path
[docs]def dbus_propagate(param_self: Any = None) -> Callable:
"""Wraps a function which causes changes to DBus properties."""
def decorator(function: Callable) -> Callable:
@functools.wraps(function)
def wrapper(*args):
function(*args)
if (param_self or args[0]).dbus_manager:
(param_self or args[0]).dbus_manager.property_diff()
return wrapper
return decorator
[docs]class DBusManager:
second_microsecond_conversion = 1000000
current_state: Dict = {}
[docs] def __init__(
self,
connection: Gio.DBusConnection,
do_on_method_call: Callable[
[
Gio.DBusConnection,
str,
str,
str,
str,
GLib.Variant,
Gio.DBusMethodInvocation,
],
None,
],
on_set_property: Callable[[Gio.DBusConnection, str, str, str, str, GLib.Variant], None],
get_config_and_player_manager: Callable[
[], Tuple[AppConfiguration, Optional[PlayerManager]]
],
):
self.get_config_and_player_manager = get_config_and_player_manager
self.do_on_method_call = do_on_method_call
self.on_set_property = on_set_property
self.connection = connection
def dbus_bus_acquired(connection: Gio.DBusConnection, name: str):
specs = [
"org.mpris.MediaPlayer2.xml",
"org.mpris.MediaPlayer2.Player.xml",
"org.mpris.MediaPlayer2.Playlists.xml",
"org.mpris.MediaPlayer2.TrackList.xml",
]
for spec in specs:
spec_path = resolve_path("dbus/mpris_specs", spec)
with open(spec_path) as f:
node_info = Gio.DBusNodeInfo.new_for_xml(f.read())
connection.register_object(
"/org/mpris/MediaPlayer2",
node_info.interfaces[0],
self.on_method_call,
self.on_get_property,
self.on_set_property,
)
# TODO (#127): I have no idea what to do here.
def dbus_name_lost(*args):
pass
self.bus_number = Gio.bus_own_name(
Gio.BusType.SESSION,
"org.mpris.MediaPlayer2.sublimemusic",
Gio.BusNameOwnerFlags.NONE,
dbus_bus_acquired,
None,
dbus_name_lost,
)
[docs] def shutdown(self):
logging.info("DBusManager is shutting down.")
self.property_diff()
Gio.bus_unown_name(self.bus_number)
[docs] def on_get_property(
self,
connection: Gio.DBusConnection,
sender: str,
path: str,
interface: str,
property_name: str,
) -> GLib.Variant:
value = self.property_dict().get(interface, {}).get(property_name)
return DBusManager.to_variant(value)
[docs] def on_method_call(
self,
connection: Gio.DBusConnection,
sender: str,
path: str,
interface: str,
method: str,
params: GLib.Variant,
invocation: Gio.DBusMethodInvocation,
):
# TODO (#127): I don't really know if this works.
if interface == "org.freedesktop.DBus.Properties":
if method == "Get":
invocation.return_value(
self.on_get_property(connection, sender, path, interface, *params)
)
elif method == "Set":
self.on_set_property(connection, sender, path, interface, *params)
elif method == "GetAll":
all_properties = {
k: DBusManager.to_variant(v)
for k, v in self.property_dict()[interface].items()
}
invocation.return_value(GLib.Variant("(a{sv})", (all_properties,)))
return
self.do_on_method_call(
connection,
sender,
path,
interface,
method,
params,
invocation,
)
[docs] @staticmethod
def to_variant(value: Any) -> GLib.Variant:
if callable(value):
return DBusManager.to_variant(value())
if isinstance(value, GLib.Variant):
return value
if type(value) == tuple:
return GLib.Variant(*value)
if type(value) == dict:
return GLib.Variant(
"a{sv}",
{k: DBusManager.to_variant(v) for k, v in value.items()},
)
variant_type = {list: "as", str: "s", int: "i", float: "d", bool: "b"}.get(type(value))
if not variant_type:
return value
return GLib.Variant(variant_type, value)
_escape_re = re.compile(r"[^\w]+")
@staticmethod
@functools.lru_cache(maxsize=1024)
def _escape_id(id: str) -> str:
"""
Escapes an ID for use in a DBus object identifier.
>>> DBusManager._escape_id("tr-1843")
'tr_0x45_1843'
>>> DBusManager._escape_id("bc9c7726-8739-4add-8df0-88c6233f37df")
'bc9c7726_0x45_8739_0x45_4add_0x45_8df0_0x45_88c6233f37df'
>>> DBusManager._escape_id("spaces spaces everywhere")
'spaces_0x32_spaces_0x32_everywhere'
>>> DBusManager._escape_id("already/has/slashes")
'already_0x47_has_0x47_slashes'
"""
def replace(m: Match[str]) -> str:
return f"_0x{ord(m[0])}_"
return DBusManager._escape_re.sub(replace, id)
[docs] def property_dict(self) -> Dict[str, dict[str, Any]]:
config, player_manager = self.get_config_and_player_manager()
if config is None or player_manager is None:
return {}
state = config.state
has_current_song = state.current_song is not None
has_next_song = False
if state.repeat_type in (RepeatType.REPEAT_QUEUE, RepeatType.REPEAT_SONG):
has_next_song = True
elif has_current_song:
has_next_song = state.current_song_index < len(state.play_queue) - 1
active_playlist = self.get_active_playlist(state.active_playlist_id)
playlist_count = 0
try:
get_playlists_result = AdapterManager.get_playlists(allow_download=False)
if get_playlists_result.data_is_available:
playlist_count = len(get_playlists_result.result())
except Exception:
pass
playback_status_map: dict[tuple[bool, bool], str] = {
(False, False): "Stopped",
(False, True): "Stopped",
(True, False): "Paused",
(True, True): "Playing",
}
return {
"org.mpris.MediaPlayer2": {
"CanQuit": True,
"CanRaise": True,
"HasTrackList": True,
"Identity": "Sublime Music",
"DesktopEntry": "sublime-music",
"SupportedUriSchemes": [],
"SupportedMimeTypes": [],
},
"org.mpris.MediaPlayer2.Player": {
"PlaybackStatus": playback_status_map[player_manager.song_loaded, state.playing],
"LoopStatus": state.repeat_type.as_mpris_loop_status(),
"Rate": 1.0,
"Shuffle": state.shuffle_on,
"Metadata": self.get_mpris_metadata(
state.current_song_index,
state.play_queue,
)
if state.current_song
else {},
"Volume": 0.0 if state.is_muted else state.volume / 100,
"Position": (
"x",
int(
max(state.song_progress.total_seconds(), 0)
* self.second_microsecond_conversion
),
),
"MinimumRate": 1.0,
"MaximumRate": 1.0,
"CanGoNext": has_current_song and has_next_song,
"CanGoPrevious": has_current_song,
"CanPlay": True,
"CanPause": True,
"CanSeek": True,
"CanControl": True,
},
"org.mpris.MediaPlayer2.TrackList": {
"Tracks": DBusManager.get_dbus_playlist(state.play_queue),
"CanEditTracks": False,
},
"org.mpris.MediaPlayer2.Playlists": {
"PlaylistCount": playlist_count,
"Orderings": ["Alphabetical", "Created", "Modified"],
"ActivePlaylist": ("(b(oss))", active_playlist),
},
}
[docs] @functools.lru_cache(maxsize=10)
def get_active_playlist(self, active_playlist_id: Optional[str]) -> Tuple[bool, GLib.Variant]:
if not active_playlist_id or not AdapterManager.can_get_playlist_details():
return (False, GLib.Variant("(oss)", ("/", "", "")))
try:
playlist = AdapterManager.get_playlist_details(
active_playlist_id, allow_download=False
).result()
try:
cover_art = AdapterManager.get_cover_art_uri(
playlist.cover_art, "file", allow_download=False
).result()
except CacheMissError:
cover_art = ""
return (
True,
GLib.Variant(
"(oss)",
(
"/playlist/" + DBusManager._escape_id(playlist.id),
playlist.name,
cover_art,
),
),
)
except Exception:
logging.exception("Couldn't get playlist details")
return (False, GLib.Variant("(oss)", ("/", "", "")))
[docs] @staticmethod
@functools.lru_cache(maxsize=20)
def get_dbus_playlist(play_queue: Tuple[str, ...]) -> List[str]:
"""
Gets a playlist formatted for DBus. If multiples of the same element exist in
the queue, it will use ``/0`` after the song ID to differentiate between the
instances.
>>> DBusManager.get_dbus_playlist(("2", "1", "3", "1"))
['/song/2/0', '/song/1/0', '/song/3/0', '/song/1/1']
"""
seen_counts: DefaultDict[str, int] = defaultdict(int)
tracks = []
for song_id in play_queue:
num_seen = seen_counts[song_id]
tracks.append(f"/song/{DBusManager._escape_id(song_id)}/{num_seen}")
seen_counts[song_id] += 1
return tracks
diff_parse_re = re.compile(r"root\['(.*?)'\]\['(.*?)'\](?:\[.*\])?")
[docs] def property_diff(self):
new_property_dict = self.property_dict()
diff = DeepDiff(self.current_state, new_property_dict)
changes: dict[str, dict[str, Any]] = defaultdict(dict)
for path, change in diff.get("values_changed", {}).items():
if m := self.diff_parse_re.match(path):
interface, property_name = m.groups()
changes[interface][property_name] = change["new_value"]
else:
logging.warning(f"Couldn't parse path {path} for diff")
if diff.get("dictionary_item_added"):
changes = new_property_dict
for interface, changed_props in changes.items():
# If the metadata has changed, just make the entire Metadata object
# part of the update.
if "Metadata" in changed_props.keys():
changed_props["Metadata"] = new_property_dict[interface]["Metadata"]
# Special handling for when the position changes (a seek).
# Technically, I'm sending this signal too often, but I don't think
# it really matters.
if interface == "org.mpris.MediaPlayer2.Player" and "Position" in changed_props:
self.connection.emit_signal(
None,
"/org/mpris/MediaPlayer2",
interface,
"Seeked",
GLib.Variant("(x)", (changed_props["Position"][1],)),
)
# Do not emit the property change.
del changed_props["Position"]
# Special handling for when the track list changes.
# Technically, I'm supposed to use `TrackAdded` and `TrackRemoved`
# signals when minor changes occur, but the docs also say that:
#
# > It is left up to the implementation to decide when a change to
# > the track list is invasive enough that this signal should be
# > emitted instead of a series of TrackAdded and TrackRemoved
# > signals.
#
# So I think that any change is invasive enough that I should use
# this signal.
if interface == "org.mpris.MediaPlayer2.TrackList" and "Tracks" in changed_props:
track_list = changed_props["Tracks"]
if len(track_list) > 0:
current_track = new_property_dict["org.mpris.MediaPlayer2.Player"][
"Metadata"
].get("mpris:trackid", track_list[0])
self.connection.emit_signal(
None,
"/org/mpris/MediaPlayer2",
interface,
"TrackListReplaced",
GLib.Variant("(aoo)", (track_list, current_track)),
)
self.connection.emit_signal(
None,
"/org/mpris/MediaPlayer2",
"org.freedesktop.DBus.Properties",
"PropertiesChanged",
GLib.Variant(
"(sa{sv}as)",
(
interface,
{k: DBusManager.to_variant(v) for k, v in changed_props.items()},
[],
),
),
)
# Update state for next diff.
self.current_state = new_property_dict