import base64
import io
import logging
import mimetypes
import multiprocessing
import os
import socket
from datetime import timedelta
from typing import Any, Callable, Dict, Optional, Set, Tuple, Type, Union, cast
from urllib.parse import urlparse
from uuid import UUID
import bottle
import pychromecast
from gi.repository import GLib
from ..adapters import AdapterManager
from ..adapters.api_objects import Song
from .base import Player, PlayerDeviceEvent, PlayerEvent
SERVE_FILES_KEY = "Serve Local Files to Chromecasts on the LAN"
LAN_PORT_KEY = "LAN Server Port Number"
[docs]class ChromecastPlayer(Player):
name = "Chromecast"
can_start_playing_with_no_latency = False
@property
def enabled(self) -> bool:
return True
[docs] @staticmethod
def get_configuration_options() -> Dict[str, Union[Type, Tuple[str, ...]]]:
return {SERVE_FILES_KEY: bool, LAN_PORT_KEY: int}
@property
def supported_schemes(self) -> Set[str]:
schemes = {"http", "https"}
if self.config.get(SERVE_FILES_KEY):
schemes.add("file")
return schemes
_timepos = 0.0
[docs] def __init__(
self,
on_timepos_change: Callable[[Optional[float]], None],
on_track_end: Callable[[], None],
on_player_event: Callable[[PlayerEvent], None],
player_device_change_callback: Callable[[PlayerDeviceEvent], None],
config: Dict[str, Union[str, int, bool]],
):
self.server_process: Optional[multiprocessing.Process] = None
self.on_timepos_change = on_timepos_change
self.on_track_end = on_track_end
self.on_player_event = on_player_event
self.player_device_change_callback = player_device_change_callback
self.change_settings(config)
self._chromecasts: Dict[UUID, pychromecast.Chromecast] = {}
self._current_chromecast: Optional[pychromecast.Chromecast] = None
self.chromecast_browser = None
self.refresh_players()
[docs] def chromecast_discovered_callback(self, chromecast: Any):
chromecast = cast(pychromecast.Chromecast, chromecast)
self._chromecasts[chromecast.cast_info.uuid] = chromecast
self.player_device_change_callback(
PlayerDeviceEvent(
PlayerDeviceEvent.Delta.ADD,
type(self),
str(chromecast.cast_info.uuid),
chromecast.cast_info.friendly_name,
)
)
[docs] def change_settings(self, config: Dict[str, Union[str, int, bool]]):
self.config = config
if self.config.get(SERVE_FILES_KEY):
# Try and terminate the existing process if it exists.
if self.server_process is not None:
try:
self.server_process.terminate()
except Exception:
pass
self.server_process = multiprocessing.Process(
target=self._run_server_process,
args=("0.0.0.0", self.config.get(LAN_PORT_KEY)),
)
self.server_process.start()
[docs] def refresh_players(self):
for id_, chromecast in self._chromecasts.items():
self.player_device_change_callback(
PlayerDeviceEvent(
PlayerDeviceEvent.Delta.REMOVE,
type(self),
str(id_),
chromecast.cast_info.friendly_name,
)
)
self._chromecasts = {}
self.chromecast_browser = pychromecast.get_chromecasts(
blocking=False, callback=self.chromecast_discovered_callback
)
[docs] def set_current_device_id(self, device_id: str):
self._current_chromecast = self._chromecasts[UUID(device_id)]
self._current_chromecast.media_controller.register_status_listener(self)
self._current_chromecast.register_status_listener(self)
self._current_chromecast.wait()
[docs] def new_cast_status(self, status: Any):
assert self._current_chromecast
self.on_player_event(
PlayerEvent(
PlayerEvent.EventType.VOLUME_CHANGE,
str(self._current_chromecast.cast_info.uuid),
volume=(status.volume_level * 100 if not status.volume_muted else 0),
)
)
# This happens when the Chromecast is first activated or after "Stop Casting" is
# pressed in the Google Home app. Reset `song_loaded` so that it calls
# `play_media` the next time around rather than trying to toggle the play state.
if status.session_id is None:
self.song_loaded = False
time_increment_order_token = 0
[docs] def shutdown(self):
if self.server_process:
self.server_process.terminate()
try:
assert self._current_chromecast
self._current_chromecast.quit_app()
except Exception:
pass
_serving_song_id = multiprocessing.Array("c", 1024) # huge buffer, just in case
_serving_token = multiprocessing.Array("c", 16)
def _run_server_process(self, host: str, port: int):
app = bottle.Bottle()
@app.route("/")
def index() -> str:
return """
<h1>Sublime Music Local Music Server</h1>
<p>
Sublime Music uses this port as a server for serving music to
Chromecasts on the same LAN.
</p>
"""
@app.route("/s/<token>")
def stream_song(token: str) -> bytes:
# typing doesn't support multiprocessing.Value very well
if token != self._serving_token.value.decode(): # type: ignore
raise bottle.HTTPError(status=401, body="Invalid token.")
song = AdapterManager.get_song_details(
self._serving_song_id.value.decode() # type: ignore
).result()
filename = AdapterManager.get_song_file_uri(song)
with open(filename[7:], "rb") as fin:
song_buffer = io.BytesIO(fin.read())
content_type = mimetypes.guess_type(filename)[0]
bottle.response.set_header("Content-Type", content_type)
bottle.response.set_header("Accept-Ranges", "bytes")
return song_buffer.read()
bottle.run(app, host=host, port=port)
@property
def playing(self) -> bool:
if not self._current_chromecast or not self._current_chromecast.media_controller:
return False
return self._current_chromecast.media_controller.status.player_is_playing
[docs] def get_volume(self) -> float:
if self._current_chromecast and self._current_chromecast.status:
# The volume is in the range [0, 1]. Multiply by 100 to get to [0, 100].
return self._current_chromecast.status.volume_level * 100
else:
return 100
[docs] def set_volume(self, volume: float):
if self._current_chromecast:
# volume value is in [0, 100]. Convert to [0, 1] for Chromecast.
self._current_chromecast.set_volume(volume / 100)
[docs] def get_is_muted(self) -> bool:
if not self._current_chromecast:
return False
return self._current_chromecast.volume_muted
[docs] def set_muted(self, muted: bool):
if not self._current_chromecast:
return
self._current_chromecast.set_volume_muted(muted)
[docs] def pause(self):
if self._current_chromecast and self._current_chromecast.media_controller:
self._current_chromecast.media_controller.pause()
[docs] def play(self):
if self._current_chromecast and self._current_chromecast.media_controller:
self._current_chromecast.media_controller.play()
[docs] def seek(self, position: timedelta):
if not self._current_chromecast:
return
do_pause = not self.playing
self._current_chromecast.media_controller.seek(position.total_seconds())
if do_pause:
self.pause()
def _wait_for_playing(self):
pass