Source code for modules.core.queue

#!/usr/bin/env python3
"""
Audio Queue Manager
Copyright (c) 2025 TAPS OSS
Project: https://github.com/TAPSOSS/Walrio
Licensed under the BSD-3-Clause License (see LICENSE file for details)

A script that manages a queue of songs from the audio library and plays them using player.py and a db file (assumed to be generated by database.py).
"""

import sys
import os
import sqlite3
import argparse
import random
import hashlib
import time
from pathlib import Path
from .player import play_audio
from .playlist import load_m3u_playlist
from enum import Enum

# Try to import database functions for auto-adding missing songs
try:
    from .database import extract_metadata, get_file_hash
    DATABASE_AVAILABLE = True
except ImportError:
    print("Note: database.py functions not available. Auto-adding missing songs disabled.")
    DATABASE_AVAILABLE = False

# Default database path
DEFAULT_DB_PATH = "walrio_library.db"

[docs] class RepeatMode(Enum): """Repeat modes for audio playback""" OFF = "off" TRACK = "track" QUEUE = "queue"
[docs] class QueueManager: """ Queue manager that handles dynamic repeat modes for audio playback. Provides playlist-level loop control with dynamic mode switching. """
[docs] def __init__(self, songs=None): """ Initialize the QueueManager. Args: songs (list, optional): List of song dictionaries or database records. Defaults to None (empty list). """ self.songs = songs or [] self.current_index = 0 self.repeat_mode = RepeatMode.OFF self.shuffle_mode = False self.playback_history = [] # Global history of previously played songs (indices) self.forward_queue = [] # Predicted future songs for shuffle mode self.forward_history = [] # Songs to return to when hitting next after previous
[docs] def set_repeat_mode(self, mode): """ Set the repeat mode (can be changed dynamically). Mode changes preserve forward queue - only manual selections clear it. Args: mode (str or RepeatMode): The repeat mode to set. Can be a string ("off", "track", "queue") or RepeatMode enum value. """ if isinstance(mode, str): mode = RepeatMode(mode.lower()) old_repeat = self.repeat_mode print(f"[DEBUG] set_repeat_mode(): Before change - current: {self.current_index}, forward_queue: {self.forward_queue}") self.repeat_mode = mode print(f"Repeat mode set to: {mode.value}") print(f"[DEBUG] set_repeat_mode(): After change - forward queue preserved with {len(self.forward_queue)} items") # Show what next song would be after mode change if mode == RepeatMode.TRACK: print(f"[DEBUG] set_repeat_mode(): Next song in track repeat would be: {self.current_index} (same)") elif mode == RepeatMode.QUEUE: next_idx = (self.current_index + 1) % len(self.songs) if len(self.songs) > 0 else None print(f"[DEBUG] set_repeat_mode(): Next song in queue repeat would be: {next_idx}") elif self.shuffle_mode and self.is_shuffle_effective(): if self.forward_queue: print(f"[DEBUG] set_repeat_mode(): Next song in shuffle mode would be: {self.forward_queue[0]}") else: print(f"[DEBUG] set_repeat_mode(): Shuffle mode but no forward queue") elif self.current_index + 1 < len(self.songs): print(f"[DEBUG] set_repeat_mode(): Next song in normal mode would be: {self.current_index + 1}")
[docs] def set_shuffle_mode(self, enabled): """ Set shuffle mode. Mode changes preserve forward queue - only manual selections clear it. Args: enabled (bool): True to enable shuffle mode, False to disable it. """ old_shuffle = self.shuffle_mode self.shuffle_mode = enabled effective_shuffle = self.is_shuffle_effective() # Show what the next song would be before and after the change print(f"[DEBUG] set_shuffle_mode(): Before change - current: {self.current_index}, forward_queue: {self.forward_queue}") print(f"Shuffle mode: {'ON' if enabled else 'OFF'}" + (f" (disabled by repeat mode)" if enabled and not effective_shuffle else "")) print(f"[DEBUG] set_shuffle_mode(): After change - forward queue preserved with {len(self.forward_queue)} items") # Show what the next song would be after mode change if not self.shuffle_mode and self.current_index + 1 < len(self.songs): print(f"[DEBUG] set_shuffle_mode(): Next song in normal mode would be: {self.current_index + 1}") elif self.shuffle_mode and effective_shuffle: if self.forward_queue: print(f"[DEBUG] set_shuffle_mode(): Next song in shuffle mode would be: {self.forward_queue[0]}") else: print(f"[DEBUG] set_shuffle_mode(): Shuffle mode but no forward queue - would generate new one")
[docs] def is_shuffle_effective(self): """ Check if shuffle mode is effectively active. Shuffle is only effective when repeat mode is OFF. Returns: bool: True if shuffle is both enabled and effective, False otherwise. """ return self.shuffle_mode and self.repeat_mode == RepeatMode.OFF
[docs] def current_song(self): """ Get the current song. Returns: dict or None: The current song dictionary, or None if no song is available. """ if not self.songs or self.current_index >= len(self.songs): return None # current_index always refers directly to songs list return self.songs[self.current_index]
def _get_next_shuffle_song(self): """ Get the next song in shuffle mode, using forward queue for consistency. Returns: int: Index of next song to play in shuffle mode """ # If we have a forward queue, use the next song from it if self.forward_queue: return self.forward_queue[0] # Generate new forward queue with remaining unplayed songs recent_history = self.playback_history[-len(self.songs):] if len(self.playback_history) >= len(self.songs) else self.playback_history unplayed_indices = [] for i in range(len(self.songs)): if recent_history.count(i) == 0 and i != self.current_index: unplayed_indices.append(i) if unplayed_indices: # Shuffle the unplayed songs and store as forward queue random.shuffle(unplayed_indices) self.forward_queue = unplayed_indices return self.forward_queue[0] else: # All songs played recently, generate completely new random sequence all_indices = [i for i in range(len(self.songs)) if i != self.current_index] random.shuffle(all_indices) self.forward_queue = all_indices return self.forward_queue[0] if self.forward_queue else self.current_index
[docs] def has_songs(self): """ Check if the queue has any songs. Returns: bool: True if there are songs in the queue, False otherwise. """ return len(self.songs) > 0
[docs] def next_track(self): """ Move to next track based on repeat mode and shuffle mode. Prioritizes forward history (from previous button) over normal progression. Always adds current song to global history for universal previous functionality. Returns: bool: True if there's a next track, False if queue ended. """ if not self.songs: return False # Check if we have forward history from previous button usage if self.forward_history: # Always add current song to playback history before moving self.playback_history.append(self.current_index) # Go back to where we were before hitting previous next_index = self.forward_history.pop() print(f"[DEBUG] next_track(): Using forward history, going to {next_index}") self.current_index = next_index return True # Always add current song to playback history before moving (except track repeat) if self.repeat_mode != RepeatMode.TRACK: self.playback_history.append(self.current_index) if self.repeat_mode == RepeatMode.TRACK: # Track repeat: stay on same track return True elif self.repeat_mode == RepeatMode.QUEUE: # Queue repeat: advance and loop at end self.current_index = (self.current_index + 1) % len(self.songs) return True elif self.shuffle_mode and self.is_shuffle_effective(): # Shuffle mode: use forward queue for consistency next_index = self._get_next_shuffle_song() # Remove the used song from forward queue if self.forward_queue and next_index == self.forward_queue[0]: self.forward_queue.pop(0) self.current_index = next_index return True else: # RepeatMode.OFF and shuffle OFF # Normal progression: advance and stop at end self.current_index += 1 return self.current_index < len(self.songs)
[docs] def next_track_skip_missing(self): """ Move to next track like next_track() but automatically skips missing files. For auto-progression after song ends - prevents playing missing files. Returns: bool: True if there's a next available track, False if queue ended. """ if not self.songs: return False # Special case: If current song is missing and in track repeat, skip to next if self.repeat_mode == RepeatMode.TRACK: current_song = self.current_song() if current_song and current_song.get('file_missing', False): print(f"[DEBUG] next_track_skip_missing(): Current song missing in track repeat, advancing") # Override track repeat to advance past missing file self.playback_history.append(self.current_index) self.current_index += 1 if self.current_index >= len(self.songs): if self.repeat_mode == RepeatMode.QUEUE: self.current_index = 0 else: return False else: # Track repeat with available file - stay on same track return True # For all other cases, use normal next_track logic but then check for missing files original_index = self.current_index max_attempts = len(self.songs) # Prevent infinite loops attempts = 0 while attempts < max_attempts: if not self.next_track(): return False next_song = self.current_song() if next_song and not next_song.get('file_missing', False): # Found an available file print(f"[DEBUG] next_track_skip_missing(): Found available song at index {self.current_index}") return True # This song is missing, continue searching print(f"[DEBUG] next_track_skip_missing(): Skipping missing file at index {self.current_index}") attempts += 1 # In queue repeat mode, check if we've looped back to start if self.repeat_mode == RepeatMode.QUEUE and self.current_index == original_index: print("[DEBUG] next_track_skip_missing(): Looped through entire queue, all files missing") return False # If we get here, all remaining files are missing print("[DEBUG] next_track_skip_missing(): No more available files in queue") return False
[docs] def previous_track(self): """ Move to previous track using global playback history. Always goes to the previously played song regardless of mode. When going back, adds current song to forward history for next button. Returns: bool: True if successfully moved to previous track, False otherwise. """ if not self.songs: return False if self.repeat_mode == RepeatMode.TRACK: # Track repeat: stay on same track return True elif len(self.playback_history) > 0: # Add current song to forward history so next can return here self.forward_history.append(self.current_index) print(f"[DEBUG] previous_track(): Added {self.current_index} to forward history") # Always go back to the most recent song in history, regardless of mode self.current_index = self.playback_history.pop() print(f"[DEBUG] previous_track(): Went back to {self.current_index}") return True else: # No history available - can't go back further return True
[docs] def set_current_index(self, index): """ Set the current track index. Manual song selection clears forward queue to resync predictions. History tracking ensures proper previous button functionality. Args: index (int): The index to set as the current track. Returns: bool: True if the index was valid and set successfully, False otherwise. """ if 0 <= index < len(self.songs): # Add current song to history before jumping to new one (if jumping to different song) if self.current_index != index and self.current_index is not None: self.playback_history.append(self.current_index) print(f"[DEBUG] set_current_index(): Added {self.current_index} to history before jumping to {index}") # Clear forward queue and forward history when manually jumping to a different song # This ensures shuffle predictions are resynced and previous/next chain is reset if self.current_index != index: self.forward_queue = [] self.forward_history = [] print(f"[DEBUG] set_current_index(): Cleared forward queue and history due to manual song selection (jumped to {index})") self.current_index = index return True return False
[docs] def add_song(self, song): """ Add a song to the queue. Args: song (dict): Song dictionary to add to the queue """ self.songs.append(song) self._update_play_order() print(f"Added song to queue: {song.get('title', 'Unknown')}")
[docs] def add_songs(self, songs): """ Add multiple songs to the queue. Args: songs (list): List of song dictionaries to add to the queue """ self.songs.extend(songs) # Clear playback history when queue changes self.playback_history.clear() print(f"Added {len(songs)} songs to queue")
[docs] def remove_song(self, index): """ Remove a song from the queue by index. Args: index (int): Index of song to remove Returns: bool: True if song was removed, False if index was invalid """ if 0 <= index < len(self.songs): removed_song = self.songs.pop(index) # Adjust current index if needed if index < self.current_index: self.current_index -= 1 elif index == self.current_index and self.current_index >= len(self.songs): self.current_index = max(0, len(self.songs) - 1) self._update_play_order() print(f"Removed song from queue: {removed_song.get('title', 'Unknown')}") return True return False
[docs] def shuffle_queue(self): """ Shuffle the entire queue by randomly reordering all songs. This physically reorders the songs list and resets the current index. Returns: bool: True if queue was shuffled, False if queue is empty """ if not self.songs: return False # Get the currently playing song before shuffle current_song = self.current_song() if self.has_songs() else None # Shuffle the actual songs list random.shuffle(self.songs) # Find the new index of the currently playing song if current_song: for i, song in enumerate(self.songs): if song == current_song: self.current_index = i break else: self.current_index = 0 # Update play order for the new arrangement self._update_play_order() print("Queue shuffled - song order randomized") return True
[docs] def play_random_song(self): """ Jump to a completely random song in the queue. This doesn't reorder the queue, just changes the current playing position. Returns: bool: True if jumped to random song, False if queue is empty """ if not self.songs: return False # Select a random index random_index = random.randint(0, len(self.songs) - 1) # Set the current index to the random position old_index = self.current_index self.current_index = random_index current_song = self.current_song() song_title = current_song.get('title', 'Unknown') if current_song else 'Unknown' print(f"Jumped to random song: {song_title} (position {random_index + 1}/{len(self.songs)})") return True
[docs] def clear_queue(self): """Clear all songs from the queue.""" self.songs.clear() self.current_index = 0 self.playback_history.clear() print("Queue cleared")
[docs] def handle_song_finished(self): """ Handle when current song finishes playing. Auto-skips missing files during progression. Returns: tuple: (should_continue: bool, next_song: dict or None) should_continue: True if playback should continue next_song: The next song to play, or None if should stop """ current_song = self.current_song() if current_song: print(f"Song finished: {current_song.get('title', 'Unknown')}") # Use next_track_skip_missing to skip missing files during auto-progression if self.next_track_skip_missing(): next_song = self.current_song() if next_song: next_title = next_song.get('title', 'Unknown') if self.repeat_mode == RepeatMode.TRACK: print(f"Repeating track: {next_title}") elif self.repeat_mode == RepeatMode.QUEUE: print(f"Moving to next song: {next_title} (#{self.current_index + 1}/{len(self.songs)})") else: print(f"Moving to next song: {next_title} (#{self.current_index + 1}/{len(self.songs)})") return True, next_song print("Queue finished - no more songs to play") return False, None
[docs] def play_queue_with_manager(songs, repeat_mode="off", shuffle=False, start_index=0, conn=None): """ Play songs using QueueManager with dynamic repeat mode support. This approach handles looping at the queue level rather than player level. Args: songs (list): List of song dictionaries or database records. repeat_mode (str): Repeat mode - "off", "track", or "queue" shuffle (bool): Enable shuffle mode start_index (int): Index to start playback from conn (sqlite3.Connection): Database connection for auto-adding missing songs """ if not songs: print("Queue is empty. Nothing to play.") return # Create queue manager queue_manager = QueueManager(songs) queue_manager.set_repeat_mode(repeat_mode) queue_manager.set_shuffle_mode(shuffle) queue_manager.set_current_index(start_index) print(f"\n=== Playing {len(songs)} songs ===") print(f"Repeat mode: {repeat_mode}") print(f"Shuffle: {'ON' if shuffle else 'OFF'}") print("Press Ctrl+C to control playback") print("Controls: 'q'=quit, 'n'=next, 's'=toggle shuffle, 'i'=instant shuffle, 'r'=repeat mode\n") while True: try: current_song = queue_manager.current_song() if not current_song: print("No current song available.") break # Display current queue status display_queue(songs, queue_manager.current_index) print(f"Now playing: {format_song_info(current_song)}") # Get file path file_path = current_song['url'] if file_path.startswith('file://'): file_path = file_path[7:] # Check if file exists if not os.path.exists(file_path): print(f"Warning: File not found: {file_path}") print("Skipping to next song...") if not queue_manager.next_track(): break continue # Auto-add missing song to database if connection is available if conn and DATABASE_AVAILABLE: try: add_missing_song_to_database(file_path, conn) except Exception as e: print(f"Note: Could not auto-add song to database: {e}") # Play the song success = play_audio(file_path) if not success: print("Error playing song. Skipping to next...") # Move to next track based on repeat mode if not queue_manager.next_track(): print("\nReached end of queue.") break except KeyboardInterrupt: print("\nPlayback interrupted by user.") user_input = input("Enter 'q' to quit, 'n' for next song, 's' to toggle shuffle, 'i' for instant shuffle, 'r' to change repeat mode, or any other key to continue: ").lower() if user_input == 'q': break elif user_input == 'n': if not queue_manager.next_track(): print("End of queue reached.") break elif user_input == 's': queue_manager.set_shuffle_mode(not queue_manager.shuffle_mode) elif user_input == 'i': # Instant shuffle - jump to random song immediately if queue_manager.play_random_song(): print("Jumped to random song!") # Continue playing the new random song else: print("Could not shuffle - empty queue.") elif user_input == 'r': print("Repeat modes: off, track, queue") new_mode = input("Enter repeat mode: ").strip().lower() if new_mode in ['off', 'track', 'queue']: queue_manager.set_repeat_mode(new_mode) else: print("Invalid repeat mode.") except Exception as e: print(f"Error during playback: {e}") if not queue_manager.next_track(): break print("Playback finished.")
[docs] def play_single_song_with_loop(song_path, repeat_mode="off"): """ Play a single song with optional looping using the queue system. This function is designed for GUI integration. Args: song_path (str): Path to the audio file repeat_mode (str): "off", "track", or "queue" Returns: QueueManager: The queue manager instance for external control """ # Create a minimal song dict for the queue song = { 'url': song_path, 'title': Path(song_path).stem, 'artist': 'Unknown Artist', 'album': 'Unknown Album' } # Create queue manager with single song queue_manager = QueueManager([song]) queue_manager.set_repeat_mode(repeat_mode) print(f"Playing: {song['title']}") print(f"Repeat mode: {repeat_mode}") return queue_manager
[docs] def connect_to_database(db_path): """ Connect to the SQLite database and return connection. Args: db_path (str): Path to the SQLite database file. Returns: sqlite3.Connection or None: Database connection object, or None if connection fails. """ """ Connect to the SQLite database and return connection. Args: db_path (str): Path to the SQLite database file. Returns: sqlite3.Connection or None: Database connection object, or None if connection fails. """ if not os.path.exists(db_path): print(f"Error: Database file '{db_path}' not found.") print("Please run database.py first to create the database.") return None try: conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row # Enable dict-like access to rows return conn except sqlite3.Error as e: print(f"Error connecting to database: {e}") return None
[docs] def get_songs_from_database(conn, filters=None): """ Get songs from database based on filters. Args: conn (sqlite3.Connection): Database connection object. filters (dict, optional): Dictionary with filter criteria. Supported keys: 'artist', 'album', 'genre' (all use partial matching). Returns: list: List of song records as sqlite3.Row objects, ordered by artist, album, disc, track. """ cursor = conn.cursor() # Base query query = """ SELECT id, title, artist, album, albumartist, url, length, track, disc, year, genre FROM songs WHERE unavailable = 0 """ params = [] # Apply filters if provided if filters: if filters.get('artist'): query += " AND (artist LIKE ? OR albumartist LIKE ?)" artist_filter = f"%{filters['artist']}%" params.extend([artist_filter, artist_filter]) if filters.get('album'): query += " AND album LIKE ?" params.append(f"%{filters['album']}%") if filters.get('genre'): query += " AND genre LIKE ?" params.append(f"%{filters['genre']}%") # Order by artist, album, disc, track for logical playback order query += " ORDER BY artist, album, disc, track" try: cursor.execute(query, params) return cursor.fetchall() except sqlite3.Error as e: print(f"Error querying database: {e}") return []
[docs] def format_song_info(song): """ Format song information for display with comprehensive metadata. Args: song (dict or sqlite3.Row): Song record with metadata. Returns: str: Formatted song information string with track, artist, title, albumartist, album, year, and duration. """ artist = song['artist'] or "Unknown Artist" albumartist = song['albumartist'] or artist # Use albumartist if available, otherwise fall back to artist title = song['title'] or "Unknown Title" album = song['album'] or "Unknown Album" # Format duration duration = "" if song['length']: minutes = song['length'] // 60 seconds = song['length'] % 60 duration = f" [{minutes}:{seconds:02d}]" # Format track number track = "" if song['track']: track = f"{song['track']:02d}. " # Format year year = "" if song['year']: year = f" ({song['year']})" return f"{track}{artist} - {title} ({albumartist} - {album}{year}){duration}"
[docs] def display_queue(queue, current_index=0): """ Display the current queue with highlighting for current song. Args: queue (list): List of song dictionaries or database records. current_index (int, optional): Index of currently playing song. Defaults to 0. """ if not queue: print("Queue is empty.") return print(f"\n=== Audio Queue ({len(queue)} songs) ===") for i, song in enumerate(queue): marker = "> " if i == current_index else " " print(f"{marker}{i+1:3d}. {format_song_info(song)}") print()
[docs] def play_queue(queue, shuffle=False, repeat=False, repeat_track=False, start_index=0, conn=None): """ Play songs in the queue with various playback options. Args: queue (list): List of song dictionaries or database records. shuffle (bool, optional): Enable shuffle mode. Defaults to False. repeat (bool, optional): Enable repeat mode (queue repeat). Defaults to False. repeat_track (bool, optional): Enable track repeat mode. Defaults to False. start_index (int, optional): Index to start playback from. Defaults to 0. conn (sqlite3.Connection, optional): Database connection for auto-adding missing songs. """ if not queue: print("Queue is empty. Nothing to play.") return # Create a copy of the queue for manipulation play_order = list(range(len(queue))) if shuffle: random.shuffle(play_order) print("Shuffle mode enabled.") if repeat: print("Repeat mode enabled.") if repeat_track: print("Track repeat mode enabled.") current_index = start_index while True: try: # Get the current song index based on play order if shuffle: song_index = play_order[current_index % len(play_order)] else: song_index = current_index % len(queue) song = queue[song_index] # Display current queue status display_queue(queue, song_index) print(f"Now playing: {format_song_info(song)}") # Get the file path from the URL (remove file:// prefix if present) file_path = song['url'] if file_path.startswith('file://'): file_path = file_path[7:] # Remove 'file://' prefix # Check if file exists if not os.path.exists(file_path): print(f"Warning: File not found: {file_path}") print("Skipping to next song...") current_index += 1 continue # Auto-add missing song to database if connection is available if conn and DATABASE_AVAILABLE: try: add_missing_song_to_database(file_path, conn) except Exception as e: print(f"Note: Could not auto-add song to database: {e}") # Play the song success = play_audio(file_path) if not success: print("Error playing song. Skipping to next...") # Handle repeat modes based on current setting if repeat_track: # Track repeat: stay on the same song continue else: # Normal progression current_index += 1 # Check if we've reached the end of the queue if current_index >= len(queue): if repeat: current_index = 0 print("\nRepeating queue from the beginning...") else: print("\nReached end of queue.") break except KeyboardInterrupt: print("\nPlayback interrupted by user.") user_input = input("Enter 'q' to quit, 'n' for next song, or any other key to continue: ").lower() if user_input == 'q': break elif user_input == 'n': current_index += 1 continue else: continue except Exception as e: print(f"Error during playback: {e}") current_index += 1 continue
[docs] def add_missing_song_to_database(file_path, conn): """ Add missing song to database automatically during playback. Args: file_path (str): Path to the audio file to add. conn (sqlite3.Connection): Database connection object. Returns: bool: True if song was added successfully, False otherwise. """ if not DATABASE_AVAILABLE: return False try: cursor = conn.cursor() file_url = f"file://{file_path}" # Check if song already exists cursor.execute('SELECT id FROM songs WHERE url = ?', (file_url,)) if cursor.fetchone(): return False # Song already exists print(f" Adding missing song to database: {os.path.basename(file_path)}") # Get file stats stat = os.stat(file_path) # Get directory for this file dir_path = str(Path(file_path).parent) # Add directory to directories table if not exists try: dir_stat = os.stat(dir_path) cursor.execute(''' INSERT OR IGNORE INTO directories (path, mtime) VALUES (?, ?) ''', (str(dir_path), int(dir_stat.st_mtime))) except Exception: pass # Directory might not exist or be accessible # Get directory ID cursor.execute('SELECT id FROM directories WHERE path = ?', (str(dir_path),)) result = cursor.fetchone() directory_id = result[0] if result else None # Extract metadata metadata = extract_metadata(file_path) if metadata is None: print(f" Warning: Could not extract metadata from {file_path}") return False # Generate IDs fingerprint = get_file_hash(file_path) song_id = fingerprint artist_id = hashlib.md5(metadata['artist'].encode()).hexdigest() if metadata['artist'] else '' album_id = hashlib.md5(f"{metadata['albumartist'] or metadata['artist']}:{metadata['album']}".encode()).hexdigest() if metadata['album'] else '' # Insert into database file_ext = Path(file_path).suffix.lower() cursor.execute(''' INSERT OR IGNORE INTO songs ( title, album, artist, albumartist, track, disc, year, originalyear, genre, composer, performer, grouping, comment, lyrics, url, directory_id, basefilename, filetype, filesize, mtime, ctime, length, bitrate, samplerate, bitdepth, compilation, art_embedded, fingerprint, song_id, artist_id, album_id, lastseen, source ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( metadata['title'], metadata['album'], metadata['artist'], metadata['albumartist'], metadata['track'], metadata['disc'], metadata['year'], metadata['originalyear'], metadata['genre'], metadata['composer'], metadata['performer'], metadata['grouping'], metadata['comment'], metadata['lyrics'], file_url, directory_id, Path(file_path).name, file_ext[1:] if file_ext else '', stat.st_size, int(stat.st_mtime), int(stat.st_ctime), metadata['length'], metadata['bitrate'], metadata['samplerate'], metadata['bitdepth'], metadata['compilation'], metadata['art_embedded'], fingerprint, song_id, artist_id, album_id, int(time.time()), 4 # source = 4 (Auto-added during playback) )) conn.commit() print(f" Added: {metadata['artist']} - {metadata['title']}") return True except Exception as e: print(f" Error adding song to database: {e}") return False
[docs] def interactive_mode(conn): """ Interactive mode for queue management. Provides a command-line interface for managing audio queues with commands for filtering, loading, and playing songs. Args: conn (sqlite3.Connection): Database connection object. """ queue = [] filters = {} print("\n=== Interactive Audio Queue Mode ===") print("Commands:") print(" list - Show all songs in library") print(" filter - Set filters (artist, album, genre)") print(" load - Load songs based on current filters") print(" playlist - Load songs from M3U playlist file") print(" show - Show current queue") print(" play - Play current queue") print(" shuffle - Toggle shuffle mode") print(" repeat - Toggle repeat mode") print(" clear - Clear current queue") print(" quit - Exit interactive mode") print() shuffle_mode = False repeat_mode = False while True: try: command = input("queue> ").strip().lower() if command == 'quit' or command == 'q': break elif command == 'list': songs = get_songs_from_database(conn) if songs: print(f"\nFound {len(songs)} songs in library:") for i, song in enumerate(songs[:20]): # Show first 20 print(f" {i+1:3d}. {format_song_info(song)}") if len(songs) > 20: print(f" ... and {len(songs) - 20} more") else: print("No songs found in library.") elif command == 'filter': print("Set filters (press Enter to skip):") artist = input("Artist: ").strip() album = input("Album: ").strip() genre = input("Genre: ").strip() filters = {} if artist: filters['artist'] = artist if album: filters['album'] = album if genre: filters['genre'] = genre print(f"Filters set: {filters}") elif command == 'load': songs = get_songs_from_database(conn, filters) if songs: queue = list(songs) print(f"Loaded {len(queue)} songs into queue.") else: print("No songs found matching current filters.") elif command == 'show': display_queue(queue) elif command == 'play': if queue: play_queue(queue, shuffle_mode, repeat_mode, 0, conn) else: print("Queue is empty. Use 'load' to add songs first.") elif command == 'shuffle': shuffle_mode = not shuffle_mode print(f"Shuffle mode: {'ON' if shuffle_mode else 'OFF'}") elif command == 'repeat': repeat_mode = not repeat_mode print(f"Repeat mode: {'ON' if repeat_mode else 'OFF'}") elif command == 'playlist': playlist_path = input("Enter playlist file path: ").strip() if not os.path.exists(playlist_path): print(f"Error: Playlist file '{playlist_path}' not found.") continue songs = load_m3u_playlist(playlist_path) if songs: queue = list(songs) print(f"Loaded {len(queue)} songs from playlist '{playlist_path}'.") else: print("No songs found in playlist or failed to load.") elif command == 'clear': queue = [] print("Queue cleared.") elif command == 'help': print("Commands: list, filter, load, playlist, show, play, shuffle, repeat, clear, quit") else: print("Unknown command. Type 'help' for available commands.") except KeyboardInterrupt: print("\nExiting interactive mode...") break except Exception as e: print(f"Error: {e}")
[docs] def main(): """ Main function for audio queue management command-line interface. Parses command-line arguments and performs queue operations including database filtering, playlist loading, and various playback modes. Examples: Play all songs by an artist with shuffle: python queue.py --artist "Pink Floyd" --shuffle Play specific album on repeat: python queue.py --album "Dark Side of the Moon" --repeat Play from external playlist: python queue.py --playlist myplaylist.m3u Interactive mode with genre filter: python queue.py --genre "Rock" --interactive Custom database path: python queue.py --db-path ~/music.db --shuffle """ parser = argparse.ArgumentParser( description="Audio Queue Manager - Play songs from your music library", epilog="Example: python queue.py --artist 'Pink Floyd' --shuffle OR python queue.py --playlist myplaylist.m3u" ) parser.add_argument( "--db-path", default=DEFAULT_DB_PATH, help=f"Path to the SQLite database file (default: {DEFAULT_DB_PATH})" ) parser.add_argument( "--shuffle", action="store_true", help="Shuffle the queue" ) parser.add_argument( "--repeat", action="store_true", help="Repeat the queue when it ends" ) parser.add_argument( "--artist", help="Filter songs by artist name (partial match)" ) parser.add_argument( "--album", help="Filter songs by album name (partial match)" ) parser.add_argument( "--genre", help="Filter songs by genre (partial match)" ) parser.add_argument( "--interactive", action="store_true", help="Start in interactive mode" ) parser.add_argument( "--list", action="store_true", help="List all songs in the library and exit" ) parser.add_argument( "--playlist", help="Load songs from an M3U playlist file" ) args = parser.parse_args() # Playlist mode - load from M3U file (no database needed for loading, but can auto-add) if args.playlist: if not os.path.exists(args.playlist): print(f"Error: Playlist file '{args.playlist}' not found.") sys.exit(1) songs = load_m3u_playlist(args.playlist) if not songs: print("No songs found in playlist or failed to load playlist.") sys.exit(1) print(f"Loaded {len(songs)} songs from playlist '{args.playlist}'.") # Try to connect to database for auto-adding missing songs conn = None if DATABASE_AVAILABLE: try: if os.path.exists(args.db_path): conn = sqlite3.connect(args.db_path) conn.row_factory = sqlite3.Row print(f"Connected to database for auto-adding missing songs: {args.db_path}") else: print(f"Database not found ({args.db_path}). Songs will play without auto-adding to database.") except Exception as e: print(f"Could not connect to database: {e}") conn = None try: # Play the playlist play_queue(list(songs), args.shuffle, args.repeat, 0, conn) finally: if conn: conn.close() return # Connect to database for database-based operations conn = connect_to_database(args.db_path) if not conn: sys.exit(1) try: # List mode - just show songs and exit if args.list: songs = get_songs_from_database(conn) if songs: print(f"Found {len(songs)} songs in library:") for i, song in enumerate(songs): print(f" {i+1:3d}. {format_song_info(song)}") else: print("No songs found in library.") return # Interactive mode if args.interactive: interactive_mode(conn) return # Build filters from command line arguments filters = {} if args.artist: filters['artist'] = args.artist if args.album: filters['album'] = args.album if args.genre: filters['genre'] = args.genre # Get songs from database songs = get_songs_from_database(conn, filters) if not songs: print("No songs found matching the specified criteria.") print("Use --list to see all available songs.") sys.exit(1) print(f"Found {len(songs)} songs matching criteria.") # Play the queue play_queue(list(songs), args.shuffle, args.repeat, 0, conn) except Exception as e: print(f"Error: {e}") sys.exit(1) finally: conn.close()
# Run file if __name__ == "__main__": main()