Source code for modules.core.queue

#!/usr/bin/env python3
"""
Audio Queue Manager
Copyright (c) 2025 TAPS OSS
Project: https://github.com/TAPSOSS/Walrio
Licensed under the BSD-3-Clause License (see LICENSE file for details)

A script that manages a queue of songs from the audio library and plays them using player.py and a db file (assumed to be generated by database.py).
"""

import sys
import os
import sqlite3
import argparse
import random
import subprocess
import hashlib
import time
from pathlib import Path
from player import play_audio
from playlist import load_m3u_playlist

# Try to import database functions for auto-adding missing songs
try:
    from database import extract_metadata, get_file_hash
    DATABASE_AVAILABLE = True
except ImportError:
    print("Note: database.py functions not available. Auto-adding missing songs disabled.")
    DATABASE_AVAILABLE = False

# Default database path
DEFAULT_DB_PATH = "walrio_library.db"

[docs] def connect_to_database(db_path): """ Connect to the SQLite database and return connection. Args: db_path (str): Path to the SQLite database file. Returns: sqlite3.Connection or None: Database connection object, or None if connection fails. """ if not os.path.exists(db_path): print(f"Error: Database file '{db_path}' not found.") print("Please run database.py first to create the database.") return None try: conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row # Enable dict-like access to rows return conn except sqlite3.Error as e: print(f"Error connecting to database: {e}") return None
[docs] def get_songs_from_database(conn, filters=None): """ Get songs from database based on filters. Args: conn (sqlite3.Connection): Database connection object. filters (dict, optional): Dictionary with filter criteria. Supported keys: 'artist', 'album', 'genre' (all use partial matching). Returns: list: List of song records as sqlite3.Row objects, ordered by artist, album, disc, track. """ cursor = conn.cursor() # Base query query = """ SELECT id, title, artist, album, albumartist, url, length, track, disc, year, genre FROM songs WHERE unavailable = 0 """ params = [] # Apply filters if provided if filters: if filters.get('artist'): query += " AND (artist LIKE ? OR albumartist LIKE ?)" artist_filter = f"%{filters['artist']}%" params.extend([artist_filter, artist_filter]) if filters.get('album'): query += " AND album LIKE ?" params.append(f"%{filters['album']}%") if filters.get('genre'): query += " AND genre LIKE ?" params.append(f"%{filters['genre']}%") # Order by artist, album, disc, track for logical playback order query += " ORDER BY artist, album, disc, track" try: cursor.execute(query, params) return cursor.fetchall() except sqlite3.Error as e: print(f"Error querying database: {e}") return []
[docs] def format_song_info(song): """ Format song information for display with comprehensive metadata. Args: song (dict or sqlite3.Row): Song record with metadata. Returns: str: Formatted song information string with track, artist, title, albumartist, album, year, and duration. """ artist = song['artist'] or "Unknown Artist" albumartist = song['albumartist'] or artist # Use albumartist if available, otherwise fall back to artist title = song['title'] or "Unknown Title" album = song['album'] or "Unknown Album" # Format duration duration = "" if song['length']: minutes = song['length'] // 60 seconds = song['length'] % 60 duration = f" [{minutes}:{seconds:02d}]" # Format track number track = "" if song['track']: track = f"{song['track']:02d}. " # Format year year = "" if song['year']: year = f" ({song['year']})" return f"{track}{artist} - {title} ({albumartist} - {album}{year}){duration}"
[docs] def display_queue(queue, current_index=0): """ Display the current queue with highlighting for current song. Args: queue (list): List of song dictionaries or database records. current_index (int, optional): Index of currently playing song. Defaults to 0. """ if not queue: print("Queue is empty.") return print(f"\n=== Audio Queue ({len(queue)} songs) ===") for i, song in enumerate(queue): marker = "▶ " if i == current_index else " " print(f"{marker}{i+1:3d}. {format_song_info(song)}") print()
[docs] def play_queue(queue, shuffle=False, repeat=False, start_index=0, conn=None): """ Play songs in the queue with various playback options. Args: queue (list): List of song dictionaries or database records. shuffle (bool, optional): Enable shuffle mode. Defaults to False. repeat (bool, optional): Enable repeat mode. Defaults to False. start_index (int, optional): Index to start playback from. Defaults to 0. conn (sqlite3.Connection, optional): Database connection for auto-adding missing songs. """ if not queue: print("Queue is empty. Nothing to play.") return # Create a copy of the queue for manipulation play_order = list(range(len(queue))) if shuffle: random.shuffle(play_order) print("Shuffle mode enabled.") if repeat: print("Repeat mode enabled.") current_index = start_index while True: try: # Get the current song index based on play order if shuffle: song_index = play_order[current_index % len(play_order)] else: song_index = current_index % len(queue) song = queue[song_index] # Display current queue status display_queue(queue, song_index) print(f"Now playing: {format_song_info(song)}") # Get the file path from the URL (remove file:// prefix if present) file_path = song['url'] if file_path.startswith('file://'): file_path = file_path[7:] # Remove 'file://' prefix # Check if file exists if not os.path.exists(file_path): print(f"Warning: File not found: {file_path}") print("Skipping to next song...") current_index += 1 continue # Auto-add missing song to database if connection is available if conn and DATABASE_AVAILABLE: try: add_missing_song_to_database(file_path, conn) except Exception as e: print(f"Note: Could not auto-add song to database: {e}") # Play the song success = play_audio(file_path) if not success: print("Error playing song. Skipping to next...") current_index += 1 # Check if we've reached the end of the queue if current_index >= len(queue): if repeat: current_index = 0 print("\nRepeating queue from the beginning...") else: print("\nReached end of queue.") break except KeyboardInterrupt: print("\nPlayback interrupted by user.") user_input = input("Enter 'q' to quit, 'n' for next song, or any other key to continue: ").lower() if user_input == 'q': break elif user_input == 'n': current_index += 1 continue else: continue except Exception as e: print(f"Error during playback: {e}") current_index += 1 continue
[docs] def add_missing_song_to_database(file_path, conn): """ Add missing song to database automatically during playback. Args: file_path (str): Path to the audio file to add. conn (sqlite3.Connection): Database connection object. Returns: bool: True if song was added successfully, False otherwise. """ if not DATABASE_AVAILABLE: return False try: cursor = conn.cursor() file_url = f"file://{file_path}" # Check if song already exists cursor.execute('SELECT id FROM songs WHERE url = ?', (file_url,)) if cursor.fetchone(): return False # Song already exists print(f" Adding missing song to database: {os.path.basename(file_path)}") # Get file stats stat = os.stat(file_path) # Get directory for this file dir_path = str(Path(file_path).parent) # Add directory to directories table if not exists try: dir_stat = os.stat(dir_path) cursor.execute(''' INSERT OR IGNORE INTO directories (path, mtime) VALUES (?, ?) ''', (str(dir_path), int(dir_stat.st_mtime))) except Exception: pass # Directory might not exist or be accessible # Get directory ID cursor.execute('SELECT id FROM directories WHERE path = ?', (str(dir_path),)) result = cursor.fetchone() directory_id = result[0] if result else None # Extract metadata metadata = extract_metadata(file_path) if metadata is None: print(f" Warning: Could not extract metadata from {file_path}") return False # Generate IDs fingerprint = get_file_hash(file_path) song_id = fingerprint artist_id = hashlib.md5(metadata['artist'].encode()).hexdigest() if metadata['artist'] else '' album_id = hashlib.md5(f"{metadata['albumartist'] or metadata['artist']}:{metadata['album']}".encode()).hexdigest() if metadata['album'] else '' # Insert into database file_ext = Path(file_path).suffix.lower() cursor.execute(''' INSERT OR IGNORE INTO songs ( title, album, artist, albumartist, track, disc, year, originalyear, genre, composer, performer, grouping, comment, lyrics, url, directory_id, basefilename, filetype, filesize, mtime, ctime, length, bitrate, samplerate, bitdepth, compilation, art_embedded, fingerprint, song_id, artist_id, album_id, lastseen, source ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( metadata['title'], metadata['album'], metadata['artist'], metadata['albumartist'], metadata['track'], metadata['disc'], metadata['year'], metadata['originalyear'], metadata['genre'], metadata['composer'], metadata['performer'], metadata['grouping'], metadata['comment'], metadata['lyrics'], file_url, directory_id, Path(file_path).name, file_ext[1:] if file_ext else '', stat.st_size, int(stat.st_mtime), int(stat.st_ctime), metadata['length'], metadata['bitrate'], metadata['samplerate'], metadata['bitdepth'], metadata['compilation'], metadata['art_embedded'], fingerprint, song_id, artist_id, album_id, int(time.time()), 4 # source = 4 (Auto-added during playback) )) conn.commit() print(f" Added: {metadata['artist']} - {metadata['title']}") return True except Exception as e: print(f" Error adding song to database: {e}") return False
[docs] def interactive_mode(conn): """ Interactive mode for queue management. Provides a command-line interface for managing audio queues with commands for filtering, loading, and playing songs. Args: conn (sqlite3.Connection): Database connection object. """ queue = [] filters = {} print("\n=== Interactive Audio Queue Mode ===") print("Commands:") print(" list - Show all songs in library") print(" filter - Set filters (artist, album, genre)") print(" load - Load songs based on current filters") print(" playlist - Load songs from M3U playlist file") print(" show - Show current queue") print(" play - Play current queue") print(" shuffle - Toggle shuffle mode") print(" repeat - Toggle repeat mode") print(" clear - Clear current queue") print(" quit - Exit interactive mode") print() shuffle_mode = False repeat_mode = False while True: try: command = input("queue> ").strip().lower() if command == 'quit' or command == 'q': break elif command == 'list': songs = get_songs_from_database(conn) if songs: print(f"\nFound {len(songs)} songs in library:") for i, song in enumerate(songs[:20]): # Show first 20 print(f" {i+1:3d}. {format_song_info(song)}") if len(songs) > 20: print(f" ... and {len(songs) - 20} more") else: print("No songs found in library.") elif command == 'filter': print("Set filters (press Enter to skip):") artist = input("Artist: ").strip() album = input("Album: ").strip() genre = input("Genre: ").strip() filters = {} if artist: filters['artist'] = artist if album: filters['album'] = album if genre: filters['genre'] = genre print(f"Filters set: {filters}") elif command == 'load': songs = get_songs_from_database(conn, filters) if songs: queue = list(songs) print(f"Loaded {len(queue)} songs into queue.") else: print("No songs found matching current filters.") elif command == 'show': display_queue(queue) elif command == 'play': if queue: play_queue(queue, shuffle_mode, repeat_mode, 0, conn) else: print("Queue is empty. Use 'load' to add songs first.") elif command == 'shuffle': shuffle_mode = not shuffle_mode print(f"Shuffle mode: {'ON' if shuffle_mode else 'OFF'}") elif command == 'repeat': repeat_mode = not repeat_mode print(f"Repeat mode: {'ON' if repeat_mode else 'OFF'}") elif command == 'playlist': playlist_path = input("Enter playlist file path: ").strip() if not os.path.exists(playlist_path): print(f"Error: Playlist file '{playlist_path}' not found.") continue songs = load_m3u_playlist(playlist_path) if songs: queue = list(songs) print(f"Loaded {len(queue)} songs from playlist '{playlist_path}'.") else: print("No songs found in playlist or failed to load.") elif command == 'clear': queue = [] print("Queue cleared.") elif command == 'help': print("Commands: list, filter, load, playlist, show, play, shuffle, repeat, clear, quit") else: print("Unknown command. Type 'help' for available commands.") except KeyboardInterrupt: print("\nExiting interactive mode...") break except Exception as e: print(f"Error: {e}")
[docs] def main(): """ Main function for audio queue management command-line interface. Parses command-line arguments and performs queue operations including database filtering, playlist loading, and various playback modes. Examples: Play all songs by an artist with shuffle: python queue.py --artist "Pink Floyd" --shuffle Play specific album on repeat: python queue.py --album "Dark Side of the Moon" --repeat Play from external playlist: python queue.py --playlist myplaylist.m3u Interactive mode with genre filter: python queue.py --genre "Rock" --interactive Custom database path: python queue.py --db-path ~/music.db --shuffle """ parser = argparse.ArgumentParser( description="Audio Queue Manager - Play songs from your music library", epilog="Example: python queue.py --artist 'Pink Floyd' --shuffle OR python queue.py --playlist myplaylist.m3u" ) parser.add_argument( "--db-path", default=DEFAULT_DB_PATH, help=f"Path to the SQLite database file (default: {DEFAULT_DB_PATH})" ) parser.add_argument( "--shuffle", action="store_true", help="Shuffle the queue" ) parser.add_argument( "--repeat", action="store_true", help="Repeat the queue when it ends" ) parser.add_argument( "--artist", help="Filter songs by artist name (partial match)" ) parser.add_argument( "--album", help="Filter songs by album name (partial match)" ) parser.add_argument( "--genre", help="Filter songs by genre (partial match)" ) parser.add_argument( "--interactive", action="store_true", help="Start in interactive mode" ) parser.add_argument( "--list", action="store_true", help="List all songs in the library and exit" ) parser.add_argument( "--playlist", help="Load songs from an M3U playlist file" ) args = parser.parse_args() # Playlist mode - load from M3U file (no database needed for loading, but can auto-add) if args.playlist: if not os.path.exists(args.playlist): print(f"Error: Playlist file '{args.playlist}' not found.") sys.exit(1) songs = load_m3u_playlist(args.playlist) if not songs: print("No songs found in playlist or failed to load playlist.") sys.exit(1) print(f"Loaded {len(songs)} songs from playlist '{args.playlist}'.") # Try to connect to database for auto-adding missing songs conn = None if DATABASE_AVAILABLE: try: if os.path.exists(args.db_path): conn = sqlite3.connect(args.db_path) conn.row_factory = sqlite3.Row print(f"Connected to database for auto-adding missing songs: {args.db_path}") else: print(f"Database not found ({args.db_path}). Songs will play without auto-adding to database.") except Exception as e: print(f"Could not connect to database: {e}") conn = None try: # Play the playlist play_queue(list(songs), args.shuffle, args.repeat, 0, conn) finally: if conn: conn.close() return # Connect to database for database-based operations conn = connect_to_database(args.db_path) if not conn: sys.exit(1) try: # List mode - just show songs and exit if args.list: songs = get_songs_from_database(conn) if songs: print(f"Found {len(songs)} songs in library:") for i, song in enumerate(songs): print(f" {i+1:3d}. {format_song_info(song)}") else: print("No songs found in library.") return # Interactive mode if args.interactive: interactive_mode(conn) return # Build filters from command line arguments filters = {} if args.artist: filters['artist'] = args.artist if args.album: filters['album'] = args.album if args.genre: filters['genre'] = args.genre # Get songs from database songs = get_songs_from_database(conn, filters) if not songs: print("No songs found matching the specified criteria.") print("Use --list to see all available songs.") sys.exit(1) print(f"Found {len(songs)} songs matching criteria.") # Play the queue play_queue(list(songs), args.shuffle, args.repeat, 0, conn) except Exception as e: print(f"Error: {e}") sys.exit(1) finally: conn.close()
# Run file if __name__ == "__main__": main()