Source code for mopidy.audio.scan

import logging
import time
from enum import IntEnum
from pathlib import Path
from typing import Any, NamedTuple, cast

from mopidy import exceptions
from mopidy.audio import tags as tags_lib
from mopidy.audio import utils
from mopidy.internal import log
from mopidy.internal.gi import Gst, GstPbutils


class GstElementFactoryListType(IntEnum):
    DECODER = 1 << 0
    AUDIO = 1 << 50
    DEMUXER = 1 << 5
    DEPAYLOADER = 1 << 8
    PARSER = 1 << 6


class GstAutoplugSelectResult(IntEnum):
    TRY = 0
    EXPOSE = 1
    SKIP = 2


class _Result(NamedTuple):
    uri: str
    tags: dict[str, Any]
    duration: int | None
    seekable: bool
    mime: str | None
    playable: bool


logger = logging.getLogger(__name__)


def _trace(*args, **kwargs):
    logger.log(log.TRACE_LOG_LEVEL, *args, **kwargs)


# TODO: replace with a scan(uri, timeout=1000, proxy_config=None)?
[docs] class Scanner: """Helper to get tags and other relevant info from URIs. :param timeout: timeout for scanning a URI in ms :param proxy_config: dictionary containing proxy config strings. :type event: int """ def __init__( self, timeout: int = 1000, proxy_config: dict[str, Any] | None = None, ) -> None: self._timeout_ms = int(timeout) self._proxy_config = proxy_config or {}
[docs] def scan( self, uri: str, timeout: float | None = None, ) -> _Result: """Scan the given uri collecting relevant metadata. :param uri: URI of the resource to scan. :type uri: string :param timeout: timeout for scanning a URI in ms. Defaults to the ``timeout`` value used when creating the scanner. :type timeout: int :return: A named tuple containing ``(uri, tags, duration, seekable, mime)``. ``tags`` is a dictionary of lists for all the tags we found. ``duration`` is the length of the URI in milliseconds, or :class:`None` if the URI has no duration. ``seekable`` is boolean. indicating if a seek would succeed. """ timeout = int(timeout or self._timeout_ms) tags, duration, seekable, mime = None, None, None, None pipeline, signals = _setup_pipeline(uri, self._proxy_config) try: _start_pipeline(pipeline) tags, mime, have_audio, duration = _process(pipeline, timeout) seekable = _query_seekable(pipeline) finally: signals.clear() pipeline.set_state(Gst.State.NULL) del pipeline return _Result(uri, tags, duration, seekable, mime, have_audio)
# Turns out it's _much_ faster to just create a new pipeline for every as # decodebins and other elements don't seem to take well to being reused. def _setup_pipeline(uri: str, proxy_config=None) -> tuple[Gst.Pipeline, utils.Signals]: src = Gst.Element.make_from_uri(Gst.URIType.SRC, uri) if not src: msg = f"GStreamer can not open: {uri}" raise exceptions.ScannerError(msg) if proxy_config: utils.setup_proxy(src, proxy_config) signals = utils.Signals() pipeline = Gst.ElementFactory.make("pipeline") if pipeline is None: msg = "Failed to create GStreamer pipeline element." raise exceptions.AudioException(msg) pipeline = cast(Gst.Pipeline, pipeline) pipeline.add(src) if _has_src_pads(src): _setup_decodebin(src, src.get_static_pad("src"), pipeline, signals) elif _has_dynamic_src_pad(src): signals.connect(src, "pad-added", _setup_decodebin, pipeline, signals) else: msg = "No pads found in source element." raise exceptions.ScannerError(msg) return pipeline, signals def _has_src_pads(element) -> bool: pads = [] element.iterate_src_pads().foreach(pads.append) return bool(pads) def _has_dynamic_src_pad(element) -> bool: for template in element.get_pad_template_list(): if ( template.direction == Gst.PadDirection.SRC and template.presence == Gst.PadPresence.SOMETIMES ): return True return False def _setup_decodebin(element, pad, pipeline, signals) -> None: # noqa: ARG001 typefind = Gst.ElementFactory.make("typefind") if typefind is None: msg = "Failed to create GStreamer typefind element." raise exceptions.AudioException(msg) decodebin = Gst.ElementFactory.make("decodebin") if decodebin is None: msg = "Failed to create GStreamer decodebin element." raise exceptions.AudioException(msg) for el in (typefind, decodebin): pipeline.add(el) el.sync_state_with_parent() pad.link(typefind.get_static_pad("sink")) typefind.link(decodebin) signals.connect(typefind, "have-type", _have_type, decodebin) signals.connect(decodebin, "pad-added", _pad_added, pipeline) signals.connect(decodebin, "autoplug-select", _autoplug_select) def _have_type( element: Gst.Element, _probability: int, caps: Gst.Caps, decodebin: Gst.Bin, ) -> None: decodebin.set_property("sink-caps", caps) struct = Gst.Structure.new_empty("have-type") struct.set_value("caps", caps.get_structure(0)) element_bus = element.get_bus() if element_bus is None: msg = "Failed to get bus of GStreamer element." raise exceptions.AudioException(msg) message = Gst.Message.new_application(element, struct) if message is None: msg = "Failed to create GStreamer message." raise exceptions.AudioException(msg) element_bus.post(message) def _pad_added( element: Gst.Element, pad: Gst.Pad, pipeline: Gst.Pipeline, ) -> None: fakesink = Gst.ElementFactory.make("fakesink") if fakesink is None: msg = "Failed to create GStreamer fakesink element." raise exceptions.AudioException(msg) fakesink.set_property("sync", False) pipeline.add(fakesink) fakesink.sync_state_with_parent() fakesink_sink = fakesink.get_static_pad("sink") if fakesink_sink is None: msg = "Failed to get sink pad of GStreamer fakesink." raise exceptions.AudioException(msg) pad.link(fakesink_sink) raw_caps = Gst.Caps.from_string("audio/x-raw") assert raw_caps if pad.query_caps().is_subset(raw_caps): # Probably won't happen due to autoplug-select fix, but lets play it # safe until we've tested more. struct = Gst.Structure.new_empty("have-audio") element_bus = element.get_bus() if element_bus is None: msg = "Failed to get bus of GStreamer element." raise exceptions.AudioException(msg) message = Gst.Message.new_application(element, struct) if message is None: msg = "Failed to create GStreamer message." raise exceptions.AudioException(msg) element_bus.post(message) def _autoplug_select( element: Gst.Element, _pad: Gst.Pad, _caps: Gst.Caps, factory: Gst.ElementFactory, ) -> GstAutoplugSelectResult: if factory.list_is_type( GstElementFactoryListType.DECODER | GstElementFactoryListType.AUDIO, ): struct = Gst.Structure.new_empty("have-audio") element_bus = element.get_bus() if element_bus is None: msg = "Failed to get bus of GStreamer element." raise exceptions.AudioException(msg) message = Gst.Message.new_application(element, struct) if message is None: msg = "Failed to create GStreamer message." raise exceptions.AudioException(msg) element_bus.post(message) if not factory.list_is_type( GstElementFactoryListType.DEMUXER | GstElementFactoryListType.DEPAYLOADER | GstElementFactoryListType.PARSER, ): return GstAutoplugSelectResult.EXPOSE return GstAutoplugSelectResult.TRY def _start_pipeline(pipeline: Gst.Pipeline) -> None: result = pipeline.set_state(Gst.State.PAUSED) if result == Gst.StateChangeReturn.NO_PREROLL: pipeline.set_state(Gst.State.PLAYING) def _query_duration(pipeline: Gst.Pipeline) -> tuple[bool, int | None]: success, duration = pipeline.query_duration(Gst.Format.TIME) if not success: duration = None # Make sure error case preserves None. elif duration < 0: duration = None # Stream without duration. else: duration = int(duration // Gst.MSECOND) return success, duration def _query_seekable(pipeline: Gst.Pipeline) -> bool: query = Gst.Query.new_seeking(Gst.Format.TIME) pipeline.query(query) return query.parse_seeking()[1] def _process( # noqa: C901, PLR0911, PLR0912, PLR0915 pipeline: Gst.Pipeline, timeout_ms: int, ) -> tuple[dict[str, Any], str | None, bool, int | None]: bus = pipeline.get_bus() tags = {} mime: str | None = None have_audio = False missing_message = None duration = None types = ( Gst.MessageType.ELEMENT | Gst.MessageType.APPLICATION | Gst.MessageType.ERROR | Gst.MessageType.EOS | Gst.MessageType.ASYNC_DONE | Gst.MessageType.DURATION_CHANGED | Gst.MessageType.TAG ) timeout = timeout_ms start = int(time.time() * 1000) while timeout > 0: msg = bus.timed_pop_filtered(timeout * Gst.MSECOND, types) if msg is None: break structure = msg.get_structure() if logger.isEnabledFor(log.TRACE_LOG_LEVEL) and structure: debug_text = structure.to_string() if len(debug_text) > 77: debug_text = debug_text[:77] + "..." _trace("element %s: %s", msg.src.get_name(), debug_text) if msg.type == Gst.MessageType.ELEMENT: if GstPbutils.is_missing_plugin_message(msg): missing_message = msg elif msg.type == Gst.MessageType.APPLICATION: if structure and structure.get_name() == "have-type": caps = cast(Gst.Caps | None, structure.get_value("caps")) if caps: mime = cast( str, caps.get_name(), # pyright: ignore[reportAttributeAccessIssue] ) if mime.startswith("text/") or mime == "application/xml": return tags, mime, have_audio, duration elif structure and structure.get_name() == "have-audio": have_audio = True elif msg.type == Gst.MessageType.ERROR: error, _debug = msg.parse_error() if ( missing_message and not mime and ( (structure := missing_message.get_structure()) and (caps := structure.get_value("detail")) ) ): # gstreamer 1.25.0 to 1.26.2 (inclusive) broke the accessing # `caps.get_structure(0).get_name()`, but work when wrapping # the object in a context manager. gstreamer 1.24.x does not # support using the structure as a context manager at all. struct = caps.get_structure(0) if hasattr(struct, "__enter__"): with struct as _struct: mime = _struct.get_name() else: mime = struct.get_name() if mime: return tags, mime, have_audio, duration raise exceptions.ScannerError(str(error)) elif msg.type == Gst.MessageType.EOS: return tags, mime, have_audio, duration elif msg.type == Gst.MessageType.ASYNC_DONE: success, duration = _query_duration(pipeline) if tags and success: return tags, mime, have_audio, duration # Don't try workaround for non-seekable sources such as mmssrc: if not _query_seekable(pipeline): return tags, mime, have_audio, duration # Workaround for upstream bug which causes tags/duration to arrive # after pre-roll. We get around this by starting to play the track # and then waiting for a duration change. # https://bugzilla.gnome.org/show_bug.cgi?id=763553 logger.debug("Using workaround for duration missing before play.") result = pipeline.set_state(Gst.State.PLAYING) if result == Gst.StateChangeReturn.FAILURE: return tags, mime, have_audio, duration elif msg.type == Gst.MessageType.DURATION_CHANGED and tags: # VBR formats sometimes seem to not have a duration by the time we # go back to paused. So just try to get it right away. success, duration = _query_duration(pipeline) pipeline.set_state(Gst.State.PAUSED) if success: return tags, mime, have_audio, duration elif msg.type == Gst.MessageType.TAG: taglist = msg.parse_tag() # Note that this will only keep the last tag. tags.update(tags_lib.convert_taglist(taglist)) timeout = timeout_ms - (int(time.time() * 1000) - start) msg = f"Timeout after {timeout_ms:d}ms" raise exceptions.ScannerError(msg) if __name__ == "__main__": import sys from mopidy.internal import path logging.basicConfig( format="%(asctime)-15s %(levelname)s %(message)s", level=log.TRACE_LOG_LEVEL, ) scanner = Scanner(5000) for uri in sys.argv[1:]: if not Gst.uri_is_valid(uri): uri = path.path_to_uri(Path(uri).resolve()) try: result = scanner.scan(uri) for key in ("uri", "mime", "duration", "playable", "seekable"): value = getattr(result, key) print(f"{key:<20} {value}") # noqa: T201 print("tags") # noqa: T201 for tag, value in result.tags.items(): line = f"{tag:<20} {value}" if len(line) > 77: line = line[:77] + "..." print(line) # noqa: T201 except exceptions.ScannerError as error: print(f"{uri}: {error}") # noqa: T201