#!/usr/bin/env python3
"""
Audio Queue Manager
Copyright (c) 2025 TAPS OSS
Project: https://github.com/TAPSOSS/Walrio
Licensed under the BSD-3-Clause License (see LICENSE file for details)
A script that manages a queue of songs from the audio library and plays them using player.py and a db file (assumed to be generated by database.py).
"""
import sys
import os
import sqlite3
import argparse
import random
import hashlib
import time
from pathlib import Path
from .player import play_audio
from .playlist import load_m3u_playlist
from enum import Enum
# Try to import database functions for auto-adding missing songs
try:
from .database import extract_metadata, get_file_hash
DATABASE_AVAILABLE = True
except ImportError:
print("Note: database.py functions not available. Auto-adding missing songs disabled.")
DATABASE_AVAILABLE = False
# Default database path
DEFAULT_DB_PATH = "walrio_library.db"
[docs]
class RepeatMode(Enum):
"""Repeat modes for audio playback"""
OFF = "off"
TRACK = "track"
QUEUE = "queue"
[docs]
class QueueManager:
"""
Queue manager that handles dynamic repeat modes for audio playback.
Provides playlist-level loop control with dynamic mode switching.
"""
[docs]
def __init__(self, songs=None):
"""
Initialize the QueueManager.
Args:
songs (list, optional): List of song dictionaries or database records.
Defaults to None (empty list).
"""
self.songs = songs or []
self.current_index = 0
self.repeat_mode = RepeatMode.OFF
self.shuffle_mode = False
self.playback_history = [] # Global history of previously played songs (indices)
self.forward_queue = [] # Predicted future songs for shuffle mode
self.forward_history = [] # Songs to return to when hitting next after previous
[docs]
def set_repeat_mode(self, mode):
"""
Set the repeat mode (can be changed dynamically).
Mode changes preserve forward queue - only manual selections clear it.
Args:
mode (str or RepeatMode): The repeat mode to set. Can be a string
("off", "track", "queue") or RepeatMode enum value.
"""
if isinstance(mode, str):
mode = RepeatMode(mode.lower())
old_repeat = self.repeat_mode
print(f"[DEBUG] set_repeat_mode(): Before change - current: {self.current_index}, forward_queue: {self.forward_queue}")
self.repeat_mode = mode
print(f"Repeat mode set to: {mode.value}")
print(f"[DEBUG] set_repeat_mode(): After change - forward queue preserved with {len(self.forward_queue)} items")
# Show what next song would be after mode change
if mode == RepeatMode.TRACK:
print(f"[DEBUG] set_repeat_mode(): Next song in track repeat would be: {self.current_index} (same)")
elif mode == RepeatMode.QUEUE:
next_idx = (self.current_index + 1) % len(self.songs) if len(self.songs) > 0 else None
print(f"[DEBUG] set_repeat_mode(): Next song in queue repeat would be: {next_idx}")
elif self.shuffle_mode and self.is_shuffle_effective():
if self.forward_queue:
print(f"[DEBUG] set_repeat_mode(): Next song in shuffle mode would be: {self.forward_queue[0]}")
else:
print(f"[DEBUG] set_repeat_mode(): Shuffle mode but no forward queue")
elif self.current_index + 1 < len(self.songs):
print(f"[DEBUG] set_repeat_mode(): Next song in normal mode would be: {self.current_index + 1}")
[docs]
def set_shuffle_mode(self, enabled):
"""
Set shuffle mode.
Mode changes preserve forward queue - only manual selections clear it.
Args:
enabled (bool): True to enable shuffle mode, False to disable it.
"""
old_shuffle = self.shuffle_mode
self.shuffle_mode = enabled
effective_shuffle = self.is_shuffle_effective()
# Show what the next song would be before and after the change
print(f"[DEBUG] set_shuffle_mode(): Before change - current: {self.current_index}, forward_queue: {self.forward_queue}")
print(f"Shuffle mode: {'ON' if enabled else 'OFF'}" +
(f" (disabled by repeat mode)" if enabled and not effective_shuffle else ""))
print(f"[DEBUG] set_shuffle_mode(): After change - forward queue preserved with {len(self.forward_queue)} items")
# Show what the next song would be after mode change
if not self.shuffle_mode and self.current_index + 1 < len(self.songs):
print(f"[DEBUG] set_shuffle_mode(): Next song in normal mode would be: {self.current_index + 1}")
elif self.shuffle_mode and effective_shuffle:
if self.forward_queue:
print(f"[DEBUG] set_shuffle_mode(): Next song in shuffle mode would be: {self.forward_queue[0]}")
else:
print(f"[DEBUG] set_shuffle_mode(): Shuffle mode but no forward queue - would generate new one")
[docs]
def is_shuffle_effective(self):
"""
Check if shuffle mode is effectively active.
Shuffle is only effective when repeat mode is OFF.
Returns:
bool: True if shuffle is both enabled and effective, False otherwise.
"""
return self.shuffle_mode and self.repeat_mode == RepeatMode.OFF
[docs]
def current_song(self):
"""
Get the current song.
Returns:
dict or None: The current song dictionary, or None if no song is available.
"""
if not self.songs or self.current_index >= len(self.songs):
return None
# current_index always refers directly to songs list
return self.songs[self.current_index]
def _get_next_shuffle_song(self):
"""
Get the next song in shuffle mode, using forward queue for consistency.
Returns:
int: Index of next song to play in shuffle mode
"""
# If we have a forward queue, use the next song from it
if self.forward_queue:
return self.forward_queue[0]
# Generate new forward queue with remaining unplayed songs
recent_history = self.playback_history[-len(self.songs):] if len(self.playback_history) >= len(self.songs) else self.playback_history
unplayed_indices = []
for i in range(len(self.songs)):
if recent_history.count(i) == 0 and i != self.current_index:
unplayed_indices.append(i)
if unplayed_indices:
# Shuffle the unplayed songs and store as forward queue
random.shuffle(unplayed_indices)
self.forward_queue = unplayed_indices
return self.forward_queue[0]
else:
# All songs played recently, generate completely new random sequence
all_indices = [i for i in range(len(self.songs)) if i != self.current_index]
random.shuffle(all_indices)
self.forward_queue = all_indices
return self.forward_queue[0] if self.forward_queue else self.current_index
[docs]
def has_songs(self):
"""
Check if the queue has any songs.
Returns:
bool: True if there are songs in the queue, False otherwise.
"""
return len(self.songs) > 0
[docs]
def next_track(self):
"""
Move to next track based on repeat mode and shuffle mode.
Prioritizes forward history (from previous button) over normal progression.
Always adds current song to global history for universal previous functionality.
Returns:
bool: True if there's a next track, False if queue ended.
"""
if not self.songs:
return False
# Check if we have forward history from previous button usage
if self.forward_history:
# Always add current song to playback history before moving
self.playback_history.append(self.current_index)
# Go back to where we were before hitting previous
next_index = self.forward_history.pop()
print(f"[DEBUG] next_track(): Using forward history, going to {next_index}")
self.current_index = next_index
return True
# Always add current song to playback history before moving (except track repeat)
if self.repeat_mode != RepeatMode.TRACK:
self.playback_history.append(self.current_index)
if self.repeat_mode == RepeatMode.TRACK:
# Track repeat: stay on same track
return True
elif self.repeat_mode == RepeatMode.QUEUE:
# Queue repeat: advance and loop at end
self.current_index = (self.current_index + 1) % len(self.songs)
return True
elif self.shuffle_mode and self.is_shuffle_effective():
# Shuffle mode: use forward queue for consistency
next_index = self._get_next_shuffle_song()
# Remove the used song from forward queue
if self.forward_queue and next_index == self.forward_queue[0]:
self.forward_queue.pop(0)
self.current_index = next_index
return True
else: # RepeatMode.OFF and shuffle OFF
# Normal progression: advance and stop at end
self.current_index += 1
return self.current_index < len(self.songs)
[docs]
def next_track_skip_missing(self):
"""
Move to next track like next_track() but automatically skips missing files.
For auto-progression after song ends - prevents playing missing files.
Returns:
bool: True if there's a next available track, False if queue ended.
"""
if not self.songs:
return False
# Special case: If current song is missing and in track repeat, skip to next
if self.repeat_mode == RepeatMode.TRACK:
current_song = self.current_song()
if current_song and current_song.get('file_missing', False):
print(f"[DEBUG] next_track_skip_missing(): Current song missing in track repeat, advancing")
# Override track repeat to advance past missing file
self.playback_history.append(self.current_index)
self.current_index += 1
if self.current_index >= len(self.songs):
if self.repeat_mode == RepeatMode.QUEUE:
self.current_index = 0
else:
return False
else:
# Track repeat with available file - stay on same track
return True
# For all other cases, use normal next_track logic but then check for missing files
original_index = self.current_index
max_attempts = len(self.songs) # Prevent infinite loops
attempts = 0
while attempts < max_attempts:
if not self.next_track():
return False
next_song = self.current_song()
if next_song and not next_song.get('file_missing', False):
# Found an available file
print(f"[DEBUG] next_track_skip_missing(): Found available song at index {self.current_index}")
return True
# This song is missing, continue searching
print(f"[DEBUG] next_track_skip_missing(): Skipping missing file at index {self.current_index}")
attempts += 1
# In queue repeat mode, check if we've looped back to start
if self.repeat_mode == RepeatMode.QUEUE and self.current_index == original_index:
print("[DEBUG] next_track_skip_missing(): Looped through entire queue, all files missing")
return False
# If we get here, all remaining files are missing
print("[DEBUG] next_track_skip_missing(): No more available files in queue")
return False
[docs]
def previous_track(self):
"""
Move to previous track using global playback history.
Always goes to the previously played song regardless of mode.
When going back, adds current song to forward history for next button.
Returns:
bool: True if successfully moved to previous track, False otherwise.
"""
if not self.songs:
return False
if self.repeat_mode == RepeatMode.TRACK:
# Track repeat: stay on same track
return True
elif len(self.playback_history) > 0:
# Add current song to forward history so next can return here
self.forward_history.append(self.current_index)
print(f"[DEBUG] previous_track(): Added {self.current_index} to forward history")
# Always go back to the most recent song in history, regardless of mode
self.current_index = self.playback_history.pop()
print(f"[DEBUG] previous_track(): Went back to {self.current_index}")
return True
else:
# No history available - can't go back further
return True
[docs]
def set_current_index(self, index):
"""
Set the current track index.
Manual song selection clears forward queue to resync predictions.
History tracking ensures proper previous button functionality.
Args:
index (int): The index to set as the current track.
Returns:
bool: True if the index was valid and set successfully, False otherwise.
"""
if 0 <= index < len(self.songs):
# Add current song to history before jumping to new one (if jumping to different song)
if self.current_index != index and self.current_index is not None:
self.playback_history.append(self.current_index)
print(f"[DEBUG] set_current_index(): Added {self.current_index} to history before jumping to {index}")
# Clear forward queue and forward history when manually jumping to a different song
# This ensures shuffle predictions are resynced and previous/next chain is reset
if self.current_index != index:
self.forward_queue = []
self.forward_history = []
print(f"[DEBUG] set_current_index(): Cleared forward queue and history due to manual song selection (jumped to {index})")
self.current_index = index
return True
return False
[docs]
def add_song(self, song):
"""
Add a song to the queue.
Args:
song (dict): Song dictionary to add to the queue
"""
self.songs.append(song)
self._update_play_order()
print(f"Added song to queue: {song.get('title', 'Unknown')}")
[docs]
def add_songs(self, songs):
"""
Add multiple songs to the queue.
Args:
songs (list): List of song dictionaries to add to the queue
"""
self.songs.extend(songs)
# Clear playback history when queue changes
self.playback_history.clear()
print(f"Added {len(songs)} songs to queue")
[docs]
def remove_song(self, index):
"""
Remove a song from the queue by index.
Args:
index (int): Index of song to remove
Returns:
bool: True if song was removed, False if index was invalid
"""
if 0 <= index < len(self.songs):
removed_song = self.songs.pop(index)
# Adjust current index if needed
if index < self.current_index:
self.current_index -= 1
elif index == self.current_index and self.current_index >= len(self.songs):
self.current_index = max(0, len(self.songs) - 1)
self._update_play_order()
print(f"Removed song from queue: {removed_song.get('title', 'Unknown')}")
return True
return False
[docs]
def shuffle_queue(self):
"""
Shuffle the entire queue by randomly reordering all songs.
This physically reorders the songs list and resets the current index.
Returns:
bool: True if queue was shuffled, False if queue is empty
"""
if not self.songs:
return False
# Get the currently playing song before shuffle
current_song = self.current_song() if self.has_songs() else None
# Shuffle the actual songs list
random.shuffle(self.songs)
# Find the new index of the currently playing song
if current_song:
for i, song in enumerate(self.songs):
if song == current_song:
self.current_index = i
break
else:
self.current_index = 0
# Update play order for the new arrangement
self._update_play_order()
print("Queue shuffled - song order randomized")
return True
[docs]
def play_random_song(self):
"""
Jump to a completely random song in the queue.
This doesn't reorder the queue, just changes the current playing position.
Returns:
bool: True if jumped to random song, False if queue is empty
"""
if not self.songs:
return False
# Select a random index
random_index = random.randint(0, len(self.songs) - 1)
# Set the current index to the random position
old_index = self.current_index
self.current_index = random_index
current_song = self.current_song()
song_title = current_song.get('title', 'Unknown') if current_song else 'Unknown'
print(f"Jumped to random song: {song_title} (position {random_index + 1}/{len(self.songs)})")
return True
[docs]
def clear_queue(self):
"""Clear all songs from the queue."""
self.songs.clear()
self.current_index = 0
self.playback_history.clear()
print("Queue cleared")
[docs]
def handle_song_finished(self):
"""
Handle when current song finishes playing.
Auto-skips missing files during progression.
Returns:
tuple: (should_continue: bool, next_song: dict or None)
should_continue: True if playback should continue
next_song: The next song to play, or None if should stop
"""
current_song = self.current_song()
if current_song:
print(f"Song finished: {current_song.get('title', 'Unknown')}")
# Use next_track_skip_missing to skip missing files during auto-progression
if self.next_track_skip_missing():
next_song = self.current_song()
if next_song:
next_title = next_song.get('title', 'Unknown')
if self.repeat_mode == RepeatMode.TRACK:
print(f"Repeating track: {next_title}")
elif self.repeat_mode == RepeatMode.QUEUE:
print(f"Moving to next song: {next_title} (#{self.current_index + 1}/{len(self.songs)})")
else:
print(f"Moving to next song: {next_title} (#{self.current_index + 1}/{len(self.songs)})")
return True, next_song
print("Queue finished - no more songs to play")
return False, None
[docs]
def play_queue_with_manager(songs, repeat_mode="off", shuffle=False, start_index=0, conn=None):
"""
Play songs using QueueManager with dynamic repeat mode support.
This approach handles looping at the queue level rather than player level.
Args:
songs (list): List of song dictionaries or database records.
repeat_mode (str): Repeat mode - "off", "track", or "queue"
shuffle (bool): Enable shuffle mode
start_index (int): Index to start playback from
conn (sqlite3.Connection): Database connection for auto-adding missing songs
"""
if not songs:
print("Queue is empty. Nothing to play.")
return
# Create queue manager
queue_manager = QueueManager(songs)
queue_manager.set_repeat_mode(repeat_mode)
queue_manager.set_shuffle_mode(shuffle)
queue_manager.set_current_index(start_index)
print(f"\n=== Playing {len(songs)} songs ===")
print(f"Repeat mode: {repeat_mode}")
print(f"Shuffle: {'ON' if shuffle else 'OFF'}")
print("Press Ctrl+C to control playback")
print("Controls: 'q'=quit, 'n'=next, 's'=toggle shuffle, 'i'=instant shuffle, 'r'=repeat mode\n")
while True:
try:
current_song = queue_manager.current_song()
if not current_song:
print("No current song available.")
break
# Display current queue status
display_queue(songs, queue_manager.current_index)
print(f"Now playing: {format_song_info(current_song)}")
# Get file path
file_path = current_song['url']
if file_path.startswith('file://'):
file_path = file_path[7:]
# Check if file exists
if not os.path.exists(file_path):
print(f"Warning: File not found: {file_path}")
print("Skipping to next song...")
if not queue_manager.next_track():
break
continue
# Auto-add missing song to database if connection is available
if conn and DATABASE_AVAILABLE:
try:
add_missing_song_to_database(file_path, conn)
except Exception as e:
print(f"Note: Could not auto-add song to database: {e}")
# Play the song
success = play_audio(file_path)
if not success:
print("Error playing song. Skipping to next...")
# Move to next track based on repeat mode
if not queue_manager.next_track():
print("\nReached end of queue.")
break
except KeyboardInterrupt:
print("\nPlayback interrupted by user.")
user_input = input("Enter 'q' to quit, 'n' for next song, 's' to toggle shuffle, 'i' for instant shuffle, 'r' to change repeat mode, or any other key to continue: ").lower()
if user_input == 'q':
break
elif user_input == 'n':
if not queue_manager.next_track():
print("End of queue reached.")
break
elif user_input == 's':
queue_manager.set_shuffle_mode(not queue_manager.shuffle_mode)
elif user_input == 'i':
# Instant shuffle - jump to random song immediately
if queue_manager.play_random_song():
print("Jumped to random song!")
# Continue playing the new random song
else:
print("Could not shuffle - empty queue.")
elif user_input == 'r':
print("Repeat modes: off, track, queue")
new_mode = input("Enter repeat mode: ").strip().lower()
if new_mode in ['off', 'track', 'queue']:
queue_manager.set_repeat_mode(new_mode)
else:
print("Invalid repeat mode.")
except Exception as e:
print(f"Error during playback: {e}")
if not queue_manager.next_track():
break
print("Playback finished.")
[docs]
def play_single_song_with_loop(song_path, repeat_mode="off"):
"""
Play a single song with optional looping using the queue system.
This function is designed for GUI integration.
Args:
song_path (str): Path to the audio file
repeat_mode (str): "off", "track", or "queue"
Returns:
QueueManager: The queue manager instance for external control
"""
# Create a minimal song dict for the queue
song = {
'url': song_path,
'title': Path(song_path).stem,
'artist': 'Unknown Artist',
'album': 'Unknown Album'
}
# Create queue manager with single song
queue_manager = QueueManager([song])
queue_manager.set_repeat_mode(repeat_mode)
print(f"Playing: {song['title']}")
print(f"Repeat mode: {repeat_mode}")
return queue_manager
[docs]
def connect_to_database(db_path):
"""
Connect to the SQLite database and return connection.
Args:
db_path (str): Path to the SQLite database file.
Returns:
sqlite3.Connection or None: Database connection object, or None if connection fails.
"""
"""
Connect to the SQLite database and return connection.
Args:
db_path (str): Path to the SQLite database file.
Returns:
sqlite3.Connection or None: Database connection object, or None if connection fails.
"""
if not os.path.exists(db_path):
print(f"Error: Database file '{db_path}' not found.")
print("Please run database.py first to create the database.")
return None
try:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row # Enable dict-like access to rows
return conn
except sqlite3.Error as e:
print(f"Error connecting to database: {e}")
return None
[docs]
def get_songs_from_database(conn, filters=None):
"""
Get songs from database based on filters.
Args:
conn (sqlite3.Connection): Database connection object.
filters (dict, optional): Dictionary with filter criteria.
Supported keys: 'artist', 'album', 'genre' (all use partial matching).
Returns:
list: List of song records as sqlite3.Row objects, ordered by artist, album, disc, track.
"""
cursor = conn.cursor()
# Base query
query = """
SELECT id, title, artist, album, albumartist, url, length, track, disc, year, genre
FROM songs
WHERE unavailable = 0
"""
params = []
# Apply filters if provided
if filters:
if filters.get('artist'):
query += " AND (artist LIKE ? OR albumartist LIKE ?)"
artist_filter = f"%{filters['artist']}%"
params.extend([artist_filter, artist_filter])
if filters.get('album'):
query += " AND album LIKE ?"
params.append(f"%{filters['album']}%")
if filters.get('genre'):
query += " AND genre LIKE ?"
params.append(f"%{filters['genre']}%")
# Order by artist, album, disc, track for logical playback order
query += " ORDER BY artist, album, disc, track"
try:
cursor.execute(query, params)
return cursor.fetchall()
except sqlite3.Error as e:
print(f"Error querying database: {e}")
return []
[docs]
def display_queue(queue, current_index=0):
"""
Display the current queue with highlighting for current song.
Args:
queue (list): List of song dictionaries or database records.
current_index (int, optional): Index of currently playing song. Defaults to 0.
"""
if not queue:
print("Queue is empty.")
return
print(f"\n=== Audio Queue ({len(queue)} songs) ===")
for i, song in enumerate(queue):
marker = "> " if i == current_index else " "
print(f"{marker}{i+1:3d}. {format_song_info(song)}")
print()
[docs]
def play_queue(queue, shuffle=False, repeat=False, repeat_track=False, start_index=0, conn=None):
"""
Play songs in the queue with various playback options.
Args:
queue (list): List of song dictionaries or database records.
shuffle (bool, optional): Enable shuffle mode. Defaults to False.
repeat (bool, optional): Enable repeat mode (queue repeat). Defaults to False.
repeat_track (bool, optional): Enable track repeat mode. Defaults to False.
start_index (int, optional): Index to start playback from. Defaults to 0.
conn (sqlite3.Connection, optional): Database connection for auto-adding missing songs.
"""
if not queue:
print("Queue is empty. Nothing to play.")
return
# Create a copy of the queue for manipulation
play_order = list(range(len(queue)))
if shuffle:
random.shuffle(play_order)
print("Shuffle mode enabled.")
if repeat:
print("Repeat mode enabled.")
if repeat_track:
print("Track repeat mode enabled.")
current_index = start_index
while True:
try:
# Get the current song index based on play order
if shuffle:
song_index = play_order[current_index % len(play_order)]
else:
song_index = current_index % len(queue)
song = queue[song_index]
# Display current queue status
display_queue(queue, song_index)
print(f"Now playing: {format_song_info(song)}")
# Get the file path from the URL (remove file:// prefix if present)
file_path = song['url']
if file_path.startswith('file://'):
file_path = file_path[7:] # Remove 'file://' prefix
# Check if file exists
if not os.path.exists(file_path):
print(f"Warning: File not found: {file_path}")
print("Skipping to next song...")
current_index += 1
continue
# Auto-add missing song to database if connection is available
if conn and DATABASE_AVAILABLE:
try:
add_missing_song_to_database(file_path, conn)
except Exception as e:
print(f"Note: Could not auto-add song to database: {e}")
# Play the song
success = play_audio(file_path)
if not success:
print("Error playing song. Skipping to next...")
# Handle repeat modes based on current setting
if repeat_track:
# Track repeat: stay on the same song
continue
else:
# Normal progression
current_index += 1
# Check if we've reached the end of the queue
if current_index >= len(queue):
if repeat:
current_index = 0
print("\nRepeating queue from the beginning...")
else:
print("\nReached end of queue.")
break
except KeyboardInterrupt:
print("\nPlayback interrupted by user.")
user_input = input("Enter 'q' to quit, 'n' for next song, or any other key to continue: ").lower()
if user_input == 'q':
break
elif user_input == 'n':
current_index += 1
continue
else:
continue
except Exception as e:
print(f"Error during playback: {e}")
current_index += 1
continue
[docs]
def add_missing_song_to_database(file_path, conn):
"""
Add missing song to database automatically during playback.
Args:
file_path (str): Path to the audio file to add.
conn (sqlite3.Connection): Database connection object.
Returns:
bool: True if song was added successfully, False otherwise.
"""
if not DATABASE_AVAILABLE:
return False
try:
cursor = conn.cursor()
file_url = f"file://{file_path}"
# Check if song already exists
cursor.execute('SELECT id FROM songs WHERE url = ?', (file_url,))
if cursor.fetchone():
return False # Song already exists
print(f" Adding missing song to database: {os.path.basename(file_path)}")
# Get file stats
stat = os.stat(file_path)
# Get directory for this file
dir_path = str(Path(file_path).parent)
# Add directory to directories table if not exists
try:
dir_stat = os.stat(dir_path)
cursor.execute('''
INSERT OR IGNORE INTO directories (path, mtime)
VALUES (?, ?)
''', (str(dir_path), int(dir_stat.st_mtime)))
except Exception:
pass # Directory might not exist or be accessible
# Get directory ID
cursor.execute('SELECT id FROM directories WHERE path = ?', (str(dir_path),))
result = cursor.fetchone()
directory_id = result[0] if result else None
# Extract metadata
metadata = extract_metadata(file_path)
if metadata is None:
print(f" Warning: Could not extract metadata from {file_path}")
return False
# Generate IDs
fingerprint = get_file_hash(file_path)
song_id = fingerprint
artist_id = hashlib.md5(metadata['artist'].encode()).hexdigest() if metadata['artist'] else ''
album_id = hashlib.md5(f"{metadata['albumartist'] or metadata['artist']}:{metadata['album']}".encode()).hexdigest() if metadata['album'] else ''
# Insert into database
file_ext = Path(file_path).suffix.lower()
cursor.execute('''
INSERT OR IGNORE INTO songs (
title, album, artist, albumartist, track, disc, year, originalyear,
genre, composer, performer, grouping, comment, lyrics,
url, directory_id, basefilename, filetype, filesize, mtime, ctime,
length, bitrate, samplerate, bitdepth,
compilation, art_embedded, fingerprint, song_id, artist_id, album_id,
lastseen, source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
metadata['title'], metadata['album'], metadata['artist'], metadata['albumartist'],
metadata['track'], metadata['disc'], metadata['year'], metadata['originalyear'],
metadata['genre'], metadata['composer'], metadata['performer'], metadata['grouping'],
metadata['comment'], metadata['lyrics'],
file_url, directory_id, Path(file_path).name, file_ext[1:] if file_ext else '', stat.st_size,
int(stat.st_mtime), int(stat.st_ctime),
metadata['length'], metadata['bitrate'], metadata['samplerate'], metadata['bitdepth'],
metadata['compilation'], metadata['art_embedded'], fingerprint, song_id, artist_id, album_id,
int(time.time()), 4 # source = 4 (Auto-added during playback)
))
conn.commit()
print(f" Added: {metadata['artist']} - {metadata['title']}")
return True
except Exception as e:
print(f" Error adding song to database: {e}")
return False
[docs]
def interactive_mode(conn):
"""
Interactive mode for queue management.
Provides a command-line interface for managing audio queues with
commands for filtering, loading, and playing songs.
Args:
conn (sqlite3.Connection): Database connection object.
"""
queue = []
filters = {}
print("\n=== Interactive Audio Queue Mode ===")
print("Commands:")
print(" list - Show all songs in library")
print(" filter - Set filters (artist, album, genre)")
print(" load - Load songs based on current filters")
print(" playlist - Load songs from M3U playlist file")
print(" show - Show current queue")
print(" play - Play current queue")
print(" shuffle - Toggle shuffle mode")
print(" repeat - Toggle repeat mode")
print(" clear - Clear current queue")
print(" quit - Exit interactive mode")
print()
shuffle_mode = False
repeat_mode = False
while True:
try:
command = input("queue> ").strip().lower()
if command == 'quit' or command == 'q':
break
elif command == 'list':
songs = get_songs_from_database(conn)
if songs:
print(f"\nFound {len(songs)} songs in library:")
for i, song in enumerate(songs[:20]): # Show first 20
print(f" {i+1:3d}. {format_song_info(song)}")
if len(songs) > 20:
print(f" ... and {len(songs) - 20} more")
else:
print("No songs found in library.")
elif command == 'filter':
print("Set filters (press Enter to skip):")
artist = input("Artist: ").strip()
album = input("Album: ").strip()
genre = input("Genre: ").strip()
filters = {}
if artist:
filters['artist'] = artist
if album:
filters['album'] = album
if genre:
filters['genre'] = genre
print(f"Filters set: {filters}")
elif command == 'load':
songs = get_songs_from_database(conn, filters)
if songs:
queue = list(songs)
print(f"Loaded {len(queue)} songs into queue.")
else:
print("No songs found matching current filters.")
elif command == 'show':
display_queue(queue)
elif command == 'play':
if queue:
play_queue(queue, shuffle_mode, repeat_mode, 0, conn)
else:
print("Queue is empty. Use 'load' to add songs first.")
elif command == 'shuffle':
shuffle_mode = not shuffle_mode
print(f"Shuffle mode: {'ON' if shuffle_mode else 'OFF'}")
elif command == 'repeat':
repeat_mode = not repeat_mode
print(f"Repeat mode: {'ON' if repeat_mode else 'OFF'}")
elif command == 'playlist':
playlist_path = input("Enter playlist file path: ").strip()
if not os.path.exists(playlist_path):
print(f"Error: Playlist file '{playlist_path}' not found.")
continue
songs = load_m3u_playlist(playlist_path)
if songs:
queue = list(songs)
print(f"Loaded {len(queue)} songs from playlist '{playlist_path}'.")
else:
print("No songs found in playlist or failed to load.")
elif command == 'clear':
queue = []
print("Queue cleared.")
elif command == 'help':
print("Commands: list, filter, load, playlist, show, play, shuffle, repeat, clear, quit")
else:
print("Unknown command. Type 'help' for available commands.")
except KeyboardInterrupt:
print("\nExiting interactive mode...")
break
except Exception as e:
print(f"Error: {e}")
[docs]
def main():
"""
Main function for audio queue management command-line interface.
Parses command-line arguments and performs queue operations including
database filtering, playlist loading, and various playback modes.
Examples:
Play all songs by an artist with shuffle:
python queue.py --artist "Pink Floyd" --shuffle
Play specific album on repeat:
python queue.py --album "Dark Side of the Moon" --repeat
Play from external playlist:
python queue.py --playlist myplaylist.m3u
Interactive mode with genre filter:
python queue.py --genre "Rock" --interactive
Custom database path:
python queue.py --db-path ~/music.db --shuffle
"""
parser = argparse.ArgumentParser(
description="Audio Queue Manager - Play songs from your music library",
epilog="Example: python queue.py --artist 'Pink Floyd' --shuffle OR python queue.py --playlist myplaylist.m3u"
)
parser.add_argument(
"--db-path",
default=DEFAULT_DB_PATH,
help=f"Path to the SQLite database file (default: {DEFAULT_DB_PATH})"
)
parser.add_argument(
"--shuffle",
action="store_true",
help="Shuffle the queue"
)
parser.add_argument(
"--repeat",
action="store_true",
help="Repeat the queue when it ends"
)
parser.add_argument(
"--artist",
help="Filter songs by artist name (partial match)"
)
parser.add_argument(
"--album",
help="Filter songs by album name (partial match)"
)
parser.add_argument(
"--genre",
help="Filter songs by genre (partial match)"
)
parser.add_argument(
"--interactive",
action="store_true",
help="Start in interactive mode"
)
parser.add_argument(
"--list",
action="store_true",
help="List all songs in the library and exit"
)
parser.add_argument(
"--playlist",
help="Load songs from an M3U playlist file"
)
args = parser.parse_args()
# Playlist mode - load from M3U file (no database needed for loading, but can auto-add)
if args.playlist:
if not os.path.exists(args.playlist):
print(f"Error: Playlist file '{args.playlist}' not found.")
sys.exit(1)
songs = load_m3u_playlist(args.playlist)
if not songs:
print("No songs found in playlist or failed to load playlist.")
sys.exit(1)
print(f"Loaded {len(songs)} songs from playlist '{args.playlist}'.")
# Try to connect to database for auto-adding missing songs
conn = None
if DATABASE_AVAILABLE:
try:
if os.path.exists(args.db_path):
conn = sqlite3.connect(args.db_path)
conn.row_factory = sqlite3.Row
print(f"Connected to database for auto-adding missing songs: {args.db_path}")
else:
print(f"Database not found ({args.db_path}). Songs will play without auto-adding to database.")
except Exception as e:
print(f"Could not connect to database: {e}")
conn = None
try:
# Play the playlist
play_queue(list(songs), args.shuffle, args.repeat, 0, conn)
finally:
if conn:
conn.close()
return
# Connect to database for database-based operations
conn = connect_to_database(args.db_path)
if not conn:
sys.exit(1)
try:
# List mode - just show songs and exit
if args.list:
songs = get_songs_from_database(conn)
if songs:
print(f"Found {len(songs)} songs in library:")
for i, song in enumerate(songs):
print(f" {i+1:3d}. {format_song_info(song)}")
else:
print("No songs found in library.")
return
# Interactive mode
if args.interactive:
interactive_mode(conn)
return
# Build filters from command line arguments
filters = {}
if args.artist:
filters['artist'] = args.artist
if args.album:
filters['album'] = args.album
if args.genre:
filters['genre'] = args.genre
# Get songs from database
songs = get_songs_from_database(conn, filters)
if not songs:
print("No songs found matching the specified criteria.")
print("Use --list to see all available songs.")
sys.exit(1)
print(f"Found {len(songs)} songs matching criteria.")
# Play the queue
play_queue(list(songs), args.shuffle, args.repeat, 0, conn)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
finally:
conn.close()
# Run file
if __name__ == "__main__":
main()