#!/usr/bin/env python3
"""
Audio Player using GStreamer Command-Line Tools
Copyright (c) 2025 TAPS OSS
Project: https://github.com/TAPSOSS/Walrio
Licensed under the BSD-3-Clause License (see LICENSE file for details)
A simple audio player that uses GStreamer command-line tools (gst-launch-1.0)
for playback control.
"""
import sys
import os
import signal
import json
import argparse
import threading
import time
from pathlib import Path
import subprocess
import shlex
[docs]
class AudioPlayer:
"""
A GStreamer command-line based audio player with playback control.
This class provides audio playback functionality using GStreamer's
command-line tools (gst-launch-1.0) including play, pause, stop,
volume control, and looping capabilities. No PyGObject bindings required.
"""
[docs]
def __init__(self):
"""
Initialize the AudioPlayer with command-line GStreamer support.
Raises:
RuntimeError: If gst-launch-1.0 is not available.
"""
# Check if gst-launch-1.0 is available
try:
subprocess.run(['gst-launch-1.0', '--version'],
capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
raise RuntimeError("gst-launch-1.0 not found. Please install GStreamer.")
# Player state
self.process = None
self.is_playing = False
self.is_paused = False
self.current_file = None
self.duration = 0
self.position = 0
self.volume = 1.0
self.should_quit = False
self.loop_mode = 'none' # 'none', number (e.g. '3'), or 'infinite'
self.repeat_count = 0
self.interactive_mode = False # Track if we're in interactive mode
self.start_time = None # Track when playback started
self.pause_time = None # Track when playback was paused
self.position_thread = None # Thread for position tracking
# Setup signal handlers
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
[docs]
def signal_handler(self, signum, frame):
"""
Handle termination signals gracefully.
Args:
signum (int): Signal number received.
frame: Current stack frame (unused).
"""
print(f"\nReceived signal {signum}, stopping playback...")
self.stop()
self.should_quit = True
def _handle_looping(self):
"""Handle looping functionality by monitoring process completion."""
if not self.process:
return
try:
# Wait for process to complete
self.process.wait()
# Check if we should loop
if self.should_quit or not self.is_playing:
return
should_loop = False
if self.loop_mode == 'infinite':
should_loop = True
elif self.loop_mode.isdigit():
if self.repeat_count < int(self.loop_mode):
should_loop = True
if should_loop:
self.repeat_count += 1
print(f"Looping song (repeat #{self.repeat_count})")
# Restart playback
self.is_playing = False # Reset state
self.play()
else:
print(f"Finished looping after {self.repeat_count} repeats")
self.is_playing = False
if self.interactive_mode:
print("player> ", end="", flush=True)
except Exception as e:
print(f"Error in looping handler: {e}")
[docs]
def load_file(self, filepath):
"""
Load an audio file for playback.
Args:
filepath (str): Path to the audio file.
Returns:
bool: True if file loaded successfully, False otherwise.
"""
absolute_path = os.path.abspath(filepath)
# Check if file exists
if not os.path.exists(absolute_path):
print(f"Error: File '{filepath}' not found.")
return False
# Check that it's a file and not directory
if not os.path.isfile(absolute_path):
print(f"Error: '{filepath}' is not a file.")
return False
# Store the file path for playback
self.current_file = absolute_path
# Reset position for new file
self.position = 0
self._stop_position_tracking()
# Get the duration of the file
self.duration = self._get_file_duration(absolute_path)
print(f"Loaded: {filepath}")
if self.duration > 0:
print(f"Duration: {self.duration:.1f} seconds")
return True
[docs]
def play(self):
"""
Start or resume playback.
Returns:
bool: True if playback started successfully, False otherwise.
"""
if not self.current_file:
print("Error: No file loaded")
return False
# If we're paused, just resume
if self.is_paused:
return self.resume()
# Stop any existing playback
if self.process:
self.stop()
# Reset repeat count for new playback session
self.repeat_count = 0
try:
# Build GStreamer pipeline command
cmd = [
'gst-launch-1.0',
'filesrc', f'location={shlex.quote(self.current_file)}',
'!', 'decodebin',
'!', 'audioconvert',
'!', 'audioresample',
'!', 'volume', f'volume={self.volume}',
'!', 'autoaudiosink'
]
# Start process
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE
)
self.is_playing = True
self.is_paused = False
print("Playback started")
# Start position tracking
self._start_position_tracking()
# Handle looping in a separate thread
if self.loop_mode != 'none':
threading.Thread(target=self._handle_looping, daemon=True).start()
return True
except Exception as e:
print(f"Error starting playback: {e}")
return False
[docs]
def pause(self):
"""
Pause playback.
Returns:
bool: True if paused successfully, False otherwise.
"""
if not self.is_playing or not self.process:
print("Player is not currently playing")
return False
try:
# Send SIGSTOP to pause the process
self.process.send_signal(signal.SIGSTOP)
# Store pause time to maintain position accuracy
self.pause_time = time.time()
self.is_paused = True
self.is_playing = False
print("Playback paused")
return True
except Exception as e:
print(f"Error pausing playback: {e}")
return False
[docs]
def resume(self):
"""
Resume paused playback.
Returns:
bool: True if resumed successfully, False otherwise.
"""
if not self.is_paused or not self.process:
print("Player is not currently paused")
return False
try:
# Send SIGCONT to resume the process
self.process.send_signal(signal.SIGCONT)
# Adjust start time to account for pause duration
if self.pause_time and self.start_time:
pause_duration = time.time() - self.pause_time
self.start_time += pause_duration
self.pause_time = None
self.is_paused = False
self.is_playing = True
print("Playback resumed")
return True
except Exception as e:
print(f"Error resuming playback: {e}")
return False
[docs]
def stop(self):
"""
Stop playback.
Returns:
bool: True if stopped successfully, False otherwise.
"""
if self.process:
try:
self.process.terminate()
# Give it a moment to terminate gracefully
try:
self.process.wait(timeout=2)
except subprocess.TimeoutExpired:
# Force kill if it doesn't terminate
self.process.kill()
self.process.wait()
except Exception as e:
print(f"Error stopping playback: {e}")
return False
finally:
self.process = None
self.is_playing = False
self.is_paused = False
self._stop_position_tracking()
return True
[docs]
def set_volume(self, volume):
"""
Set playback volume.
Args:
volume (float): Volume level between 0.0 and 1.0.
Returns:
bool: True if volume set successfully, False otherwise.
"""
if volume < 0.0 or volume > 1.0:
print("Error: Volume must be between 0.0 and 1.0")
return False
self.volume = volume
print(f"Volume set to {volume:.2f}")
# Note: Volume will be applied when next playback starts
return True
[docs]
def get_volume(self):
"""
Get current volume level.
Returns:
float: Current volume between 0.0 and 1.0.
"""
return self.volume
[docs]
def seek(self, position_seconds):
"""
Seek to a specific position in the audio.
Args:
position_seconds (float): Position to seek to in seconds.
Returns:
bool: True if seek successful, False otherwise.
"""
if not self.current_file:
print("Error: No file loaded")
return False
if position_seconds < 0:
print("Error: Seek position cannot be negative")
return False
if self.duration > 0 and position_seconds > self.duration:
print(f"Error: Seek position {position_seconds:.1f}s exceeds duration {self.duration:.1f}s")
return False
# Store current playing state
was_playing = self.is_playing
try:
# Stop current playback if any
if self.process:
self.stop()
# Use ffmpeg to create a temporary stream starting at the seek position
# This is more reliable than trying to seek with gst-launch-1.0
cmd = [
'ffmpeg',
'-ss', str(position_seconds), # Seek to position
'-i', self.current_file, # Input file
'-f', 'wav', # Output format
'-af', f'volume={self.volume}', # Apply volume
'-', # Output to stdout
]
# Pipe ffmpeg output to gst-launch-1.0 for playback
ffmpeg_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
gst_cmd = [
'gst-launch-1.0',
'fdsrc', 'fd=0',
'!', 'wavparse',
'!', 'audioconvert',
'!', 'audioresample',
'!', 'autoaudiosink'
]
self.process = subprocess.Popen(
gst_cmd,
stdin=ffmpeg_process.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Close the ffmpeg stdout in the parent process
ffmpeg_process.stdout.close()
# Update our position tracking
self.position = position_seconds
if was_playing:
self.is_playing = True
self.is_paused = False
# Adjust start time to account for the seek position
self.start_time = time.time() - position_seconds
# Start position tracking
if self.position_thread is None or not self.position_thread.is_alive():
self.position_thread = threading.Thread(target=self._update_position, daemon=True)
self.position_thread.start()
print(f"Seeked to {position_seconds:.1f} seconds and resumed playback")
else:
self.is_playing = False
self.is_paused = True
# Pause the process immediately after seeking
time.sleep(0.1)
self.process.send_signal(signal.SIGSTOP)
print(f"Seeked to {position_seconds:.1f} seconds")
return True
except Exception as e:
print(f"Error seeking to {position_seconds} seconds: {e}")
return False
[docs]
def get_position(self):
"""
Get current playback position.
Returns:
float: Current position in seconds.
"""
return self.position
[docs]
def get_duration(self):
"""
Get total duration of the current audio file.
Returns:
float: Total duration in seconds.
"""
return self.duration
[docs]
def set_loop_mode(self, mode):
"""
Set loop mode for playback.
Args:
mode (str): Loop mode - 'none', number (e.g. '3'), or 'infinite'.
Returns:
bool: True if loop mode set successfully, False otherwise.
"""
# Validate input
if mode != 'none' and mode != 'infinite' and not mode.isdigit():
print("Error: Loop mode must be 'none', a number (e.g. '3'), or 'infinite'")
return False
if mode.isdigit() and int(mode) <= 0:
print("Error: Loop count must be a positive number")
return False
self.loop_mode = mode
# Don't reset repeat_count here - let it reset on new playback
if mode == 'none':
print("Loop mode: Off")
elif mode == 'infinite':
print("Loop mode: Infinite repeats")
elif mode.isdigit():
print(f"Loop mode: Repeat {mode} times")
return True
[docs]
def get_loop_mode(self):
"""
Get current loop mode.
Returns:
str: Current loop mode setting.
"""
return self.loop_mode
[docs]
def get_repeat_count(self):
"""
Get number of times the current song has repeated.
Returns:
int: Number of repeats completed.
"""
return self.repeat_count
[docs]
def get_state(self):
"""
Get current player state information.
Returns:
dict: Dictionary containing player state including:
- is_playing (bool): Whether audio is currently playing
- is_paused (bool): Whether audio is paused
- current_file (str): Path to current file
- position (float): Current position in seconds
- duration (float): Total duration in seconds
- volume (float): Current volume level
- loop_mode (str): Current loop mode
- repeat_count (int): Number of repeats completed
"""
return {
"is_playing": self.is_playing,
"is_paused": self.is_paused,
"current_file": self.current_file,
"position": self.get_position(),
"duration": self.get_duration(),
"volume": self.get_volume(),
"loop_mode": self.loop_mode,
"repeat_count": self.repeat_count
}
[docs]
def run_interactive(self):
"""
Run the player in interactive mode with command input.
Starts a command-line interface allowing real-time control
of playback through user commands.
"""
self.interactive_mode = True
print("\nInteractive Audio Player")
print("Commands:")
print(" play/p - Start/resume playback")
print(" pause/ps - Pause playback")
print(" stop/s - Stop playback")
print(" volume/v <0.0-1.0> - Set volume")
print(" seek/sk <seconds> - Seek to position")
print(" loop/l <none|number|infinite> - Set loop mode (e.g. 'loop 3' or 'loop infinite')")
print(" status/st - Show current status")
print(" quit/q - Quit player")
print()
# Handle input directly in main thread
self.handle_input()
def _update_position(self):
"""Update position tracking while playing."""
while self.is_playing and not self.should_quit:
if self.start_time and not self.is_paused:
elapsed = time.time() - self.start_time
self.position = elapsed
time.sleep(0.1) # Update every 100ms
def _start_position_tracking(self):
"""Start position tracking in a separate thread."""
if self.position_thread and self.position_thread.is_alive():
return
self.start_time = time.time()
self.position_thread = threading.Thread(target=self._update_position, daemon=True)
self.position_thread.start()
def _stop_position_tracking(self):
"""Stop position tracking."""
self.start_time = None
self.pause_time = None
if self.position_thread:
self.position_thread = None
def _get_file_duration(self, filepath):
"""
Get the duration of an audio file using ffprobe.
Args:
filepath (str): Path to the audio file.
Returns:
float: Duration in seconds, or 0 if unable to determine.
"""
try:
# Use ffprobe to get duration
cmd = [
'ffprobe',
'-v', 'quiet',
'-show_entries', 'format=duration',
'-of', 'csv=p=0',
filepath
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0 and result.stdout.strip():
duration = float(result.stdout.strip())
return duration
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, ValueError, FileNotFoundError):
# Fallback: try using gst-discoverer-1.0 if ffprobe fails
try:
cmd = [
'gst-discoverer-1.0',
'-v',
filepath
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
# Parse duration from gst-discoverer output
for line in result.stdout.split('\n'):
if 'Duration:' in line:
# Format is usually "Duration: 0:03:45.123456789"
duration_str = line.split('Duration:')[1].strip()
# Convert time format to seconds
time_parts = duration_str.split(':')
if len(time_parts) >= 3:
hours = float(time_parts[0])
minutes = float(time_parts[1])
seconds = float(time_parts[2])
total_seconds = hours * 3600 + minutes * 60 + seconds
return total_seconds
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, ValueError, FileNotFoundError):
pass
return 0.0
[docs]
def play_audio(filepath):
"""
Simple playback function for command-line compatibility.
Args:
filepath (str): Path to the audio file to play.
Returns:
bool: True if playback completed successfully, False otherwise.
"""
try:
player = AudioPlayer()
if not player.load_file(filepath):
return False
if not player.play():
return False
# Wait for playback to complete or user interruption
try:
while player.is_playing and not player.should_quit:
time.sleep(0.1)
except KeyboardInterrupt:
print("\nPlayback interrupted by user.")
finally:
player.stop()
return True
except Exception as e:
print(f"Error: {e}")
return False
[docs]
def main():
"""
Main function to handle command line arguments and play audio.
Parses command-line arguments and initiates appropriate playback mode
(simple, interactive, or daemon).
Examples:
Simple playback:
python player.py /path/to/song.mp3
Interactive mode with controls:
python player.py --interactive /path/to/song.mp3
Daemon mode for external control:
python player.py --daemon /path/to/song.mp3
Using test file:
python player.py ../../testing_files/test.mp3
"""
parser = argparse.ArgumentParser(
description="Audio Player using GStreamer with full playback control",
epilog="Examples:\n"
" python player.py /path/to/song.mp3 # Simple playback\n"
" python player.py --interactive /path/to/song.mp3 # Interactive mode\n"
" python player.py --daemon /path/to/song.mp3 # Daemon mode (for Electron)",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"filepath",
nargs='?',
help="Path to the audio file to play"
)
parser.add_argument(
"--interactive", "-i",
action="store_true",
help="Run in interactive mode with playback controls"
)
parser.add_argument(
"--daemon", "-d",
action="store_true",
help="Run in daemon mode (for external control)"
)
parser.add_argument(
"--command", "-c",
choices=['play', 'pause', 'stop', 'resume', 'status'],
help="Send command to running player instance"
)
parser.add_argument(
"--volume", "-v",
type=float,
help="Set volume (0.0 to 1.0)"
)
parser.add_argument(
"--seek", "-s",
type=float,
help="Seek to position in seconds"
)
parser.add_argument(
"--loop", "-l",
default='none',
help="Set loop mode: 'none', number (e.g. '3'), or 'infinite'"
)
args = parser.parse_args()
try:
player = AudioPlayer()
# Handle different modes
if args.interactive:
if args.filepath:
if not player.load_file(args.filepath):
sys.exit(1)
# Set loop mode if specified
if args.loop:
player.set_loop_mode(args.loop)
player.run_interactive()
elif args.daemon:
# Daemon mode - load file and wait for external commands
if args.filepath:
if not player.load_file(args.filepath):
sys.exit(1)
# Set loop mode if specified
if args.loop:
player.set_loop_mode(args.loop)
player.play()
# Wait for playback or user interruption
try:
while not player.should_quit:
time.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
player.stop()
else:
# Simple playback mode (backward compatibility)
if not args.filepath:
print("Error: filepath required for simple playback mode")
parser.print_help()
sys.exit(1)
# Apply volume if specified
if args.volume is not None:
player.set_volume(args.volume)
# Set loop mode if specified
if args.loop:
player.set_loop_mode(args.loop)
success = play_audio(args.filepath)
sys.exit(0 if success else 1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
# run file
if __name__ == "__main__":
main()