Source code for sublime_music.players.mpv

import threading
from datetime import timedelta
from typing import Callable, Dict, Optional, Tuple, Type, Union, cast

import mpv

from ..adapters.api_objects import Song
from .base import Player, PlayerDeviceEvent, PlayerEvent

REPLAY_GAIN_KEY = "Replay Gain"
GAPLESS_PLAYBACK_KEY = "Gapless Playback"


[docs]class MPVPlayer(Player): enabled = True name = "Local Playback" can_start_playing_with_no_latency = True supported_schemes = {"http", "https", "file"} song_loaded = False _progress_value_lock = threading.Lock() _progress_value_count = 0 _volume = 100.0 _muted = False _is_mock = False
[docs] @staticmethod def get_configuration_options() -> Dict[str, Union[Type, Tuple[str, ...]]]: return { REPLAY_GAIN_KEY: ("Disabled", "Track", "Album"), GAPLESS_PLAYBACK_KEY: ("Disabled", "Enabled"), }
[docs] def __init__( self, on_timepos_change: Callable[[Optional[float]], None], on_track_end: Callable[[], None], on_player_event: Callable[[PlayerEvent], None], player_device_change_callback: Callable[[PlayerDeviceEvent], None], config: Dict[str, Union[str, int, bool]], ): self.mpv = mpv.MPV() if MPVPlayer._is_mock: self.mpv.audio_device = "null" self.mpv.audio_client_name = "sublime-music" self.change_settings(config) @self.mpv.property_observer("time-pos") def time_observer(_, value: Optional[float]): on_timepos_change(value) if value is None and self._progress_value_count > 1: on_track_end() with self._progress_value_lock: self._progress_value_count = 0 if value: with self._progress_value_lock: self._progress_value_count += 1 @self.mpv.property_observer("demuxer-cache-time") def cache_size_observer(_, value: Optional[float]): on_player_event( PlayerEvent( PlayerEvent.EventType.STREAM_CACHE_PROGRESS_CHANGE, "this device", stream_cache_duration=value, ) ) # Indicate to the UI that we exist. player_device_change_callback( PlayerDeviceEvent( PlayerDeviceEvent.Delta.ADD, type(self), "this device", "This Device" ) )
[docs] def change_settings(self, config: Dict[str, Union[str, int, bool]]): self.config = config self.mpv.replaygain = { "Disabled": "no", "Track": "track", "Album": "album", }.get(cast(str, config.get(REPLAY_GAIN_KEY, "Disabled")), "no")
[docs] def refresh_players(self): # Don't do anything pass
[docs] def set_current_device_id(self, device_id: str): # Don't do anything beacuse it should always be the "this device" ID. pass
[docs] def shutdown(self): pass
[docs] def reset(self): self.song_loaded = False with self._progress_value_lock: self._progress_value_count = 0
@property def playing(self) -> bool: return not self.mpv.pause @property def gapless_playback(self) -> bool: return self.config.get(GAPLESS_PLAYBACK_KEY) == "Enabled"
[docs] def get_volume(self) -> float: return self._volume
[docs] def set_volume(self, volume: float): if not self._muted: self.mpv.volume = volume self._volume = volume
[docs] def get_is_muted(self) -> bool: return self._muted
[docs] def set_muted(self, muted: bool): self.mpv.volume = 0 if muted else self._volume self._muted = muted
[docs] def play_media(self, uri: str, progress: timedelta, song: Song): with self._progress_value_lock: self._progress_value_count = 0 # Clears everything except the currently-playing song self.mpv.command("playlist-clear") options = { "force-seekable": "yes", "start": str(progress.total_seconds()), } self.mpv.command( "loadfile", uri, "replace", ",".join(f"{k}={v}" for k, v in options.items()) ) self.mpv.pause = False self.song_loaded = True
[docs] def pause(self): self.mpv.pause = True
[docs] def play(self): self.mpv.pause = False
[docs] def seek(self, position: timedelta): self.mpv.seek(str(position.total_seconds()), "absolute")
[docs] def next_media_cached(self, uri: str, song: Song): if not self.gapless_playback: return # Ensure the only 2 things in the playlist are the current song # and the next song for gapless playback self.mpv.command("playlist-clear") self.mpv.command("loadfile", uri, "append")