Source code for modules.addons.playlist_cloner

#!/usr/bin/env python3
"""
Playlist Cloner
Copyright (c) 2025 TAPS OSS
Project: https://github.com/TAPSOSS/Walrio
Licensed under the BSD-3-Clause License (see LICENSE file for details)

Clones all audio files from a playlist to a new directory with optional format conversion.
Useful for creating portable collections or converting playlists for specific devices.
"""

import os
import sys
import argparse
import logging
import shutil
from pathlib import Path
from typing import List, Dict, Optional, Tuple

# Add parent directory to path for module imports
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from addons.convert import AudioConverter, SUPPORTED_OUTPUT_FORMATS, BITRATE_PRESETS

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger('PlaylistCloner')


[docs] class PlaylistCloner: """ Clones audio files from a playlist to a destination directory with optional format conversion. """
[docs] def __init__(self, playlist_path: str, output_dir: str, output_format: str = 'opus', bitrate: str = '192k', preserve_structure: bool = False, skip_existing: bool = True, dry_run: bool = False): """ Initialize the PlaylistCloner. Args: playlist_path (str): Path to the M3U playlist file output_dir (str): Destination directory for cloned files output_format (str): Output audio format (default: opus) bitrate (str): Bitrate for lossy formats (default: 192k) preserve_structure (bool): If True, preserve folder structure; if False, flatten skip_existing (bool): Skip files that already exist in destination dry_run (bool): If True, show what would be done without actually doing it """ self.playlist_path = playlist_path self.output_dir = output_dir self.output_format = output_format self.bitrate = bitrate self.preserve_structure = preserve_structure self.skip_existing = skip_existing self.dry_run = dry_run # Statistics self.total_files = 0 self.converted_files = 0 self.copied_files = 0 self.skipped_files = 0 self.error_files = 0 # Validate playlist exists if not os.path.isfile(playlist_path): raise FileNotFoundError(f"Playlist file not found: {playlist_path}") # Validate output format if output_format not in SUPPORTED_OUTPUT_FORMATS: raise ValueError(f"Unsupported output format: {output_format}")
def _load_playlist_paths(self) -> List[str]: """ Load file paths from the M3U playlist. Returns: List[str]: List of absolute file paths """ paths = [] playlist_dir = os.path.dirname(os.path.abspath(self.playlist_path)) try: with open(self.playlist_path, 'r', encoding='utf-8') as f: lines = f.readlines() for line in lines: line = line.strip() # Skip empty lines and comments if not line or line.startswith('#'): continue # Convert relative paths to absolute if not os.path.isabs(line): file_path = os.path.abspath(os.path.join(playlist_dir, line)) else: file_path = line # Check if file exists if os.path.isfile(file_path): paths.append(file_path) else: logger.warning(f"File not found: {file_path}") self.error_files += 1 return paths except Exception as e: logger.error(f"Error loading playlist: {str(e)}") return [] def _get_output_path(self, input_file: str) -> str: """ Determine the output path for a file. Args: input_file (str): Input file path Returns: str: Output file path """ # Get output extension output_ext = SUPPORTED_OUTPUT_FORMATS[self.output_format]['ext'] base_name = os.path.splitext(os.path.basename(input_file))[0] if self.preserve_structure: # Preserve the directory structure relative to the playlist location playlist_dir = os.path.dirname(os.path.abspath(self.playlist_path)) input_abs = os.path.abspath(input_file) # Try to get relative path from playlist directory try: rel_path = os.path.relpath(os.path.dirname(input_abs), playlist_dir) output_subdir = os.path.join(self.output_dir, rel_path) except ValueError: # If files are on different drives, just use basename output_subdir = self.output_dir os.makedirs(output_subdir, exist_ok=True) return os.path.join(output_subdir, f"{base_name}.{output_ext}") else: # Flatten to output directory return os.path.join(self.output_dir, f"{base_name}.{output_ext}") def _needs_conversion(self, input_file: str) -> bool: """ Check if file needs conversion or can be copied. Args: input_file (str): Input file path Returns: bool: True if conversion needed, False if can be copied """ input_ext = os.path.splitext(input_file)[1].lower().lstrip('.') output_ext = SUPPORTED_OUTPUT_FORMATS[self.output_format]['ext'] # If extensions match, no conversion needed return input_ext != output_ext
[docs] def clone_playlist(self) -> Tuple[int, int, int, int]: """ Clone all files from the playlist to the output directory. Returns: Tuple[int, int, int, int]: (total, converted, copied, skipped, errors) """ logger.info(f"Loading playlist: {self.playlist_path}") file_paths = self._load_playlist_paths() if not file_paths: logger.error("No valid files found in playlist") return 0, 0, 0, 0, self.error_files self.total_files = len(file_paths) logger.info(f"Found {self.total_files} files in playlist") logger.info(f"Output directory: {self.output_dir}") logger.info(f"Output format: {self.output_format} @ {self.bitrate}") logger.info(f"Structure: {'Preserved' if self.preserve_structure else 'Flattened'}") if self.dry_run: logger.info("[DRY RUN MODE] - No files will be modified") logger.info("=" * 80) # Create output directory if not self.dry_run: os.makedirs(self.output_dir, exist_ok=True) # Setup converter converter_options = { 'output_format': self.output_format, 'bitrate': self.bitrate, 'metadata': 'y', 'skip_existing': self.skip_existing, 'force_overwrite': False, } converter = AudioConverter(converter_options) # Process each file for idx, input_file in enumerate(file_paths, 1): output_path = self._get_output_path(input_file) filename = os.path.basename(input_file) logger.info(f"[{idx}/{self.total_files}] Processing: {filename}") # Check if output file already exists if self.skip_existing and os.path.exists(output_path): logger.info(f" → Skipped (already exists): {os.path.basename(output_path)}") self.skipped_files += 1 continue if self.dry_run: if self._needs_conversion(input_file): logger.info(f" → Would convert to: {os.path.basename(output_path)}") else: logger.info(f" → Would copy to: {os.path.basename(output_path)}") continue # Check if conversion is needed if self._needs_conversion(input_file): # Convert the file output_subdir = os.path.dirname(output_path) success, reason = converter.convert_file(input_file, output_subdir) if success and reason == 'converted': logger.info(f" ✓ Converted to: {os.path.basename(output_path)}") self.converted_files += 1 elif success and reason in ('already_target_format', 'skipped_existing'): logger.info(f" → Skipped: {reason}") self.skipped_files += 1 else: logger.error(f" ✗ Conversion failed") self.error_files += 1 else: # Copy the file (already in target format) try: os.makedirs(os.path.dirname(output_path), exist_ok=True) shutil.copy2(input_file, output_path) logger.info(f" ✓ Copied to: {os.path.basename(output_path)}") self.copied_files += 1 except Exception as e: logger.error(f" ✗ Copy failed: {str(e)}") self.error_files += 1 logger.info("=" * 80) logger.info(f"Cloning completed!") logger.info(f"Total files: {self.total_files}") logger.info(f"Converted: {self.converted_files}") logger.info(f"Copied: {self.copied_files}") logger.info(f"Skipped: {self.skipped_files}") logger.info(f"Errors: {self.error_files}") return self.total_files, self.converted_files, self.copied_files, self.skipped_files, self.error_files
[docs] def parse_arguments(): """ Parse command line arguments. Returns: argparse.Namespace: Parsed arguments """ parser = argparse.ArgumentParser( description="Clone audio files from a playlist to a new directory with optional format conversion", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""Examples: # Clone playlist to portable device with 192kbps Opus (default) python playlist_cloner.py my_playlist.m3u /media/usb/music/ # Clone with MP3 format at 320kbps python playlist_cloner.py my_playlist.m3u /output/ --format mp3 --bitrate 320k # Clone with preserved folder structure python playlist_cloner.py my_playlist.m3u /output/ --preserve-structure # Clone to FLAC (lossless) - good for archival python playlist_cloner.py my_playlist.m3u /backup/ --format flac # Dry run to see what would happen python playlist_cloner.py my_playlist.m3u /output/ --dry-run Supported output formats: mp3 - MP3 (MPEG Layer III) aac - AAC (Advanced Audio Coding) opus - Opus (default - best quality/size ratio) ogg - Ogg Vorbis flac - FLAC (lossless) alac - Apple Lossless wav - WAV (uncompressed) Common bitrate presets: Opus: 64k (low), 128k (medium), 192k (high - default), 256k (maximum) MP3: 96k (low), 192k (medium), 320k (high) AAC: 96k (low), 192k (medium), 256k (high) """ ) parser.add_argument( 'playlist', help='Path to the M3U playlist file' ) parser.add_argument( 'output_dir', help='Destination directory for cloned files' ) parser.add_argument( '--format', '-f', dest='output_format', choices=list(SUPPORTED_OUTPUT_FORMATS.keys()), default='opus', help='Output audio format (default: opus)' ) parser.add_argument( '--bitrate', '-b', default='192k', help='Bitrate for lossy formats (e.g., 192k, 320k) (default: 192k)' ) parser.add_argument( '--preserve-structure', '-p', action='store_true', help='Preserve the folder structure from source (default: flatten to output directory)' ) parser.add_argument( '--skip-existing', '-s', action='store_true', default=True, help='Skip files that already exist in destination (default: True)' ) parser.add_argument( '--overwrite', '-o', action='store_true', help='Overwrite existing files in destination' ) parser.add_argument( '--dry-run', '-d', action='store_true', help='Show what would be done without actually doing it' ) parser.add_argument( '--verbose', '-v', action='store_true', help='Enable verbose logging' ) return parser.parse_args()
[docs] def main(): """ Main entry point for the playlist cloner. """ args = parse_arguments() # Set logging level if args.verbose: logger.setLevel(logging.DEBUG) logging.getLogger('AudioConverter').setLevel(logging.DEBUG) # Handle overwrite flag skip_existing = not args.overwrite if args.overwrite else args.skip_existing try: # Create playlist cloner cloner = PlaylistCloner( playlist_path=args.playlist, output_dir=args.output_dir, output_format=args.output_format, bitrate=args.bitrate, preserve_structure=args.preserve_structure, skip_existing=skip_existing, dry_run=args.dry_run ) # Clone the playlist total, converted, copied, skipped, errors = cloner.clone_playlist() # Exit with error code if there were errors if errors > 0: sys.exit(1) else: sys.exit(0) except Exception as e: logger.error(f"Fatal error: {str(e)}") sys.exit(1)
if __name__ == '__main__': main()