Source code for modules.addons.convert

#!/usr/bin/env python3
"""
Audio File Converter using FFmpeg
Copyright (c) 2025 TAPS OSS
Project: https://github.com/TAPSOSS/Walrio
Licensed under the BSD-3-Clause License (see LICENSE file for details)

A flexible audio conversion tool that supports multiple input formats and provides
various conversion options including output format selection, metadata preservation,
bitrate adjustment, and bit depth selection.
"""

import os
import sys
import argparse
import subprocess
import logging
import json
import tempfile
from pathlib import Path
from typing import List, Dict, Any, Tuple, Optional, Union

# Configure logging format
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger('AudioConverter')

# Supported formats for conversion
SUPPORTED_OUTPUT_FORMATS = {
    'mp3': {'ext': 'mp3', 'desc': 'MP3 (MPEG Layer III)', 'codec': 'libmp3lame'},
    'aac': {'ext': 'm4a', 'desc': 'AAC (Advanced Audio Coding)', 'codec': 'aac'},
    'opus': {'ext': 'opus', 'desc': 'Opus (Opus Interactive Audio Codec)', 'codec': 'libopus'},
    'ogg': {'ext': 'ogg', 'desc': 'Ogg Vorbis', 'codec': 'libvorbis'},
    'flac': {'ext': 'flac', 'desc': 'FLAC (Free Lossless Audio Codec)', 'codec': 'flac'},
    'alac': {'ext': 'm4a', 'desc': 'ALAC (Apple Lossless Audio Codec)', 'codec': 'alac'},
    'wav': {'ext': 'wav', 'desc': 'WAV (Waveform Audio File Format)', 'codec': 'pcm_s16le'},
    'wv': {'ext': 'wv', 'desc': 'WavPack', 'codec': 'wavpack'},
}

# Default conversion settings
DEFAULT_SETTINGS = {
    'output_format': 'flac',
    'metadata': 'y',  # y = yes, n = no
    'bitrate': '320k',
    'bit_depth': '16',
    'sample_rate': '48000',
    'channels': '2',
    'quality': 'maximum',  # standard, high, maximum
    'skip_existing': False,
    'recursive': False,
    'force_overwrite': False,  # Don't force overwrite by default
}

# Bitrate presets for various formats (in kbps)
BITRATE_PRESETS = {
    'mp3': {'low': '96k', 'medium': '192k', 'high': '320k'},
    'aac': {'low': '96k', 'medium': '192k', 'high': '256k'},
    'opus': {'low': '64k', 'medium': '128k', 'high': '256k'},
    'ogg': {'low': '96k', 'medium': '192k', 'high': '256k'},
}

# Bit depth options
BIT_DEPTH_OPTIONS = {
    '16': {'desc': '16-bit (CD Quality)', 'pcm_codec': 'pcm_s16le'},
    '24': {'desc': '24-bit (Studio Quality)', 'pcm_codec': 'pcm_s24le'},
    '32': {'desc': '32-bit (Float)', 'pcm_codec': 'pcm_f32le'},
}

# Quality presets mapping
QUALITY_PRESETS = {
    'low': {'desc': 'Lower quality, smaller files'},
    'standard': {'desc': 'Standard quality, balanced file size'},
    'high': {'desc': 'High quality, larger files'},
    'maximum': {'desc': 'Maximum quality'},
}

[docs] class AudioConverter: """ Audio file converter using FFmpeg to convert between various audio formats with options for metadata preservation, bitrate, bit depth, and other settings. """
[docs] def __init__(self, options: Dict[str, Any]): """ Initialize the AudioConverter with the specified options. Args: options (dict): Dictionary of conversion options """ self.options = DEFAULT_SETTINGS.copy() self.options.update(options) # Validate FFmpeg availability self._check_ffmpeg()
def _check_ffmpeg(self): """ Check if FFmpeg is available and get version information. Raises: RuntimeError: If FFmpeg is not found. """ try: result = subprocess.run( ['ffmpeg', '-version'], capture_output=True, text=True, check=True ) ffmpeg_version = result.stdout.split('\\n')[0] logger.debug(f"Using {ffmpeg_version}") except (subprocess.CalledProcessError, FileNotFoundError): raise RuntimeError( "FFmpeg not found. Please install FFmpeg and make sure it's in your PATH." )
[docs] def get_file_info(self, filepath: str) -> Dict[str, Any]: """ Get detailed information about an audio file using FFprobe. Args: filepath (str): Path to the audio file Returns: dict: Dictionary containing file information """ try: cmd = [ 'ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', '-show_streams', filepath ] result = subprocess.run( cmd, capture_output=True, text=True, check=True ) file_info = json.loads(result.stdout) return file_info except (subprocess.CalledProcessError, json.JSONDecodeError) as e: logger.warning(f"Could not get file info for {filepath}: {str(e)}") return {}
[docs] def display_file_info(self, filepath: str): """ Display detailed information about an audio file. Args: filepath (str): Path to the audio file """ file_info = self.get_file_info(filepath) if not file_info: print(f"Could not get information for {filepath}") return print(f"\nFile Information: {os.path.basename(filepath)}") print("-" * 50) # Format information if 'format' in file_info: fmt = file_info['format'] duration = float(fmt.get('duration', '0')) minutes = int(duration / 60) seconds = int(duration % 60) print(f"Format: {fmt.get('format_long_name', 'Unknown')}") print(f"Duration: {minutes}:{seconds:02d}") print(f"Size: {int(fmt.get('size', 0)) / (1024*1024):.2f} MB") print(f"Bitrate: {int(fmt.get('bit_rate', 0)) / 1000:.0f} kbps") # Stream information if 'streams' in file_info: audio_streams = [s for s in file_info['streams'] if s.get('codec_type') == 'audio'] for i, stream in enumerate(audio_streams): print(f"\nAudio Stream #{i+1}:") print(f" Codec: {stream.get('codec_name', 'Unknown')} ({stream.get('codec_long_name', 'Unknown')})") print(f" Sample Rate: {stream.get('sample_rate', 'Unknown')} Hz") print(f" Channels: {stream.get('channels', 'Unknown')}") print(f" Channel Layout: {stream.get('channel_layout', 'Unknown')}") print(f" Bit Depth: {stream.get('bits_per_sample', 'N/A')}-bit") # Metadata if 'format' in file_info and 'tags' in file_info['format']: tags = file_info['format']['tags'] print("\nMetadata:") for key, value in tags.items(): print(f" {key}: {value}")
[docs] def build_ffmpeg_command(self, input_file: str, output_file: str) -> List[str]: """ Build the FFmpeg command for audio conversion based on the options. Args: input_file (str): Path to the input file output_file (str): Path to the output file Returns: list: FFmpeg command as a list of arguments """ output_format = self.options['output_format'] codec = SUPPORTED_OUTPUT_FORMATS[output_format]['codec'] # Start with basic command cmd = ['ffmpeg'] # Add force overwrite flag if specified if self.options.get('force_overwrite', False): cmd.append('-y') cmd.extend(['-i', input_file]) # Add codec cmd.extend(['-c:a', codec]) # Handle bitrate for lossy formats if output_format in ('mp3', 'aac', 'opus', 'ogg'): cmd.extend(['-b:a', self.options['bitrate']]) # Handle bit depth for lossless formats if output_format in ('flac', 'wav', 'alac'): if output_format == 'wav': # For WAV, we need to select the appropriate PCM codec based on bit depth bit_depth = self.options['bit_depth'] if bit_depth in BIT_DEPTH_OPTIONS: pcm_codec = BIT_DEPTH_OPTIONS[bit_depth]['pcm_codec'] cmd[cmd.index('-c:a') + 1] = pcm_codec else: # For other lossless formats, set the bit depth parameter cmd.extend(['-sample_fmt', f's{self.options["bit_depth"]}']) # Set sample rate if specified if self.options.get('sample_rate'): cmd.extend(['-ar', self.options['sample_rate']]) # Set channels if specified if self.options.get('channels'): cmd.extend(['-ac', self.options['channels']]) # Quality settings for specific formats if output_format == 'mp3' and codec == 'libmp3lame': # MP3 quality setting (0-9, where 0 is best) quality = {'low': '5', 'standard': '2', 'high': '0', 'maximum': '0'} cmd.extend(['-q:a', quality.get(self.options['quality'], '2')]) elif output_format == 'opus': # Opus has excellent quality even at low bitrates if self.options['quality'] == 'maximum': cmd.extend(['-compression_level', '10']) elif output_format == 'flac': # FLAC compression level (0-12, where 12 is highest compression) flac_compression = {'low': '5', 'standard': '8', 'high': '10', 'maximum': '12'} cmd.extend(['-compression_level', flac_compression.get(self.options['quality'], '8')]) # Handle metadata preservation if self.options['metadata'] == 'n': cmd.extend(['-map_metadata', '-1']) else: # Always map metadata from input to output cmd.extend(['-map_metadata', '0']) # Special handling for Opus format which needs specific treatment for album art if output_format == 'opus': # For Opus, we only map audio stream in the initial command # Album art will be handled separately after audio conversion cmd = [x for x in cmd if x != '-map' or x == '-map_metadata'] # Remove any map options except metadata cmd.extend(['-map', '0:a:0']) # Only map first audio stream else: # For other formats with album art support cmd.extend(['-map', '0']) # Map all streams cmd.extend(['-c:v', 'copy']) # Copy album art as-is if output_format == 'mp3': # For MP3, ensure album art is properly tagged cmd.extend(['-id3v2_version', '3', '-write_id3v1', '1']) # Set the output file cmd.append(output_file) return cmd
[docs] def convert_file(self, input_file: str, output_dir: Optional[str] = None) -> bool: """ Convert a single audio file to the specified format. Args: input_file (str): Path to the input file output_dir (str, optional): Output directory. If None, use the input directory. Returns: bool: True if conversion was successful, False otherwise """ if not os.path.isfile(input_file): logger.error(f"Input file does not exist: {input_file}") return False # Get output directory if output_dir is None: output_dir = os.path.dirname(input_file) # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Determine output filename output_format = self.options['output_format'] output_ext = SUPPORTED_OUTPUT_FORMATS[output_format]['ext'] base_name = os.path.splitext(os.path.basename(input_file))[0] output_file = os.path.join(output_dir, f"{base_name}.{output_ext}") # Check if output file already exists and skip_existing is True if os.path.exists(output_file) and self.options['skip_existing']: logger.info(f"Skipping existing file: {output_file}") return True # Get file information for logging file_info = self.get_file_info(input_file) input_format = file_info.get('format', {}).get('format_name', 'unknown') # Check if file has album art has_album_art = False album_art_streams = [] if 'streams' in file_info: for i, stream in enumerate(file_info['streams']): if stream.get('codec_type') == 'video' and 'attached_pic' in stream.get('disposition', {}): has_album_art = True album_art_streams.append(i) logger.info(f"Converting {input_file} ({input_format}) to {output_format}") if has_album_art: if self.options['metadata'] == 'y': if output_format == 'opus': logger.info(f"Album art detected - will be properly embedded in Opus file using mutagen") else: logger.info(f"Album art detected - will be preserved in output file") else: logger.info(f"Album art detected but will be removed (--metadata n specified)") # Build FFmpeg command cmd = self.build_ffmpeg_command(input_file, output_file) logger.debug(f"FFmpeg command: {' '.join(cmd)}") try: # Run FFmpeg with a timeout logger.info(f"Starting FFmpeg process for audio conversion") try: process = subprocess.run( cmd, capture_output=True, text=True, check=False, timeout=300 # Set a 5-minute timeout for the conversion process ) # Log command output for debugging if process.stdout: logger.debug(f"FFmpeg stdout: {process.stdout}") # Check if user declined to overwrite the file if process.returncode != 0: # Look for the specific "not overwriting" message in stderr if "not overwriting" in process.stderr.lower() or "file exists" in process.stderr.lower(): logger.info(f"User declined to overwrite {output_file}") else: logger.error(f"FFmpeg error: {process.stderr}") return False logger.info(f"Initial audio conversion completed successfully") except subprocess.TimeoutExpired: logger.error(f"FFmpeg process timed out after 5 minutes") return False # Special handling for Opus files with album art if output_format == 'opus' and has_album_art and self.options['metadata'] == 'y': # Extract album art to temporary file logger.info("Extracting album art for Opus embedding") temp_art_file = f"{output_file}.albumart.jpg" # Get the first album art stream art_stream_index = album_art_streams[0] if album_art_streams else 0 # Extract album art using FFmpeg (force overwrite for temp files) art_cmd = [ 'ffmpeg', '-y', '-i', input_file, # -y is ok here since it's a temp file '-an', '-vcodec', 'copy', '-map', f'0:{art_stream_index}', temp_art_file ] try: logger.info(f"Extracting album art to {temp_art_file}") logger.debug(f"Album art extraction command: {' '.join(art_cmd)}") art_process = subprocess.run( art_cmd, capture_output=True, text=True, check=False, timeout=60 # Set a 1-minute timeout for art extraction ) if art_process.returncode == 0 and os.path.exists(temp_art_file): # Now embed the extracted art into the Opus file using mutagen via python script logger.info(f"Album art extracted successfully to {temp_art_file}") logger.info("Embedding album art in Opus file using mutagen") # Create a small Python script to embed the cover using mutagen temp_script = f"{output_file}.embed_art.py" with open(temp_script, 'w') as f: f.write(f''' import base64 from mutagen.oggopus import OggOpus from mutagen.flac import Picture from PIL import Image import io # Load the image and resize if necessary img = Image.open("{temp_art_file}") img = img.convert("RGB") img = img.resize((1000, 1000), Image.LANCZOS) # Resize to reasonable size buf = io.BytesIO() img.save(buf, format="JPEG") img_data = buf.getvalue() # Create FLAC Picture object pic = Picture() pic.mime = "image/jpeg" pic.type = 3 # Front cover pic.data = img_data pic.desc = "Cover" # Embed in Opus file opus = OggOpus("{output_file}") opus["METADATA_BLOCK_PICTURE"] = [base64.b64encode(pic.write()).decode("ascii")] opus.save() print("Album art embedded successfully!") ''') # Execute the Python script python_cmd = [sys.executable, temp_script] logger.debug(f"Running Python script to embed album art: {' '.join(python_cmd)}") python_process = subprocess.run( python_cmd, capture_output=True, text=True, check=False, timeout=60 # Set a 1-minute timeout for embedding ) if python_process.returncode == 0: logger.info("Successfully embedded album art in Opus file") else: logger.warning(f"Failed to embed album art: {python_process.stderr}") # Clean up temporary files for temp_file in [temp_art_file, temp_script]: if os.path.exists(temp_file): os.remove(temp_file) else: logger.warning(f"Failed to extract album art: {art_process.stderr}") except subprocess.TimeoutExpired: logger.error("Album art processing timed out") except Exception as e: logger.warning(f"Error during album art processing: {str(e)}") logger.info(f"Successfully converted {input_file} to {output_file}") return True except Exception as e: logger.error(f"Error converting {input_file}: {str(e)}") # Clean up any temporary files that might have been created temp_files = [ f"{output_file}.albumart.jpg", f"{output_file}.embed_art.py" ] for tmp_file in temp_files: if os.path.exists(tmp_file): try: os.remove(tmp_file) except: pass return False
[docs] def convert_directory(self, input_dir: str, output_dir: Optional[str] = None) -> Tuple[int, int]: """ Convert all audio files in a directory to the specified format. Args: input_dir (str): Input directory containing audio files output_dir (str, optional): Output directory. If None, use the input directory. Returns: tuple: (number of successful conversions, total number of files attempted) """ if not os.path.isdir(input_dir): logger.error(f"Input directory does not exist: {input_dir}") return (0, 0) # Create output directory if it doesn't exist if output_dir is not None: os.makedirs(output_dir, exist_ok=True) # Get list of audio files audio_extensions = ['.mp3', '.flac', '.wav', '.ogg', '.m4a', '.aac', '.opus', '.wma', '.ape', '.wv'] files_to_convert = [] if self.options['recursive']: # Walk through directory tree recursively for root, _, files in os.walk(input_dir): for file in files: if any(file.lower().endswith(ext) for ext in audio_extensions): files_to_convert.append(os.path.join(root, file)) else: # Non-recursive: just get files in the top directory files_to_convert = [ os.path.join(input_dir, file) for file in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, file)) and any(file.lower().endswith(ext) for ext in audio_extensions) ] total_files = len(files_to_convert) logger.info(f"Found {total_files} audio files to convert") # Convert each file success_count = 0 for i, file in enumerate(files_to_convert, 1): logger.info(f"Processing file {i}/{total_files}: {os.path.basename(file)}") # Determine output path preserving directory structure if recursive if output_dir is not None and self.options['recursive']: rel_path = os.path.relpath(os.path.dirname(file), input_dir) file_output_dir = os.path.join(output_dir, rel_path) os.makedirs(file_output_dir, exist_ok=True) else: file_output_dir = output_dir if self.convert_file(file, file_output_dir): success_count += 1 return (success_count, total_files)
[docs] def parse_arguments(): """ Parse command line arguments. Returns: argparse.Namespace: Parsed arguments """ parser = argparse.ArgumentParser( description="Audio File Converter using FFmpeg", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="Examples:\n" " # Convert a single file to MP3\n" " python convert.py input.wav --format mp3\n\n" " # Convert multiple files at once\n" " python convert.py file1.wav file2.mp3 file3.flac --format ogg\n\n" " # Convert a directory of files to FLAC (default format)\n" " python convert.py /music/input\n\n" " # Convert files from multiple directories\n" " python convert.py /music/input1 /music/input2 -o /music/output\n\n" " # Explicitly specify that inputs are files\n" " python convert.py file1 file2 --type file\n\n" " # Convert files recursively without metadata, high logging level\n" " python convert.py /music/input -o /music/output -r --metadata n --logging high\n\n" " # Force overwrite existing files without prompting\n" " python convert.py input.flac -f opus -y\n" ) # Input/output options parser.add_argument( "input", nargs='+', # Accept one or more input files/directories help="Input file(s) or directory to convert" ) parser.add_argument( "--type", choices=["file", "directory", "auto"], default="auto", help="Explicitly specify if inputs are files or a directory (default: auto-detect)" ) parser.add_argument( "-o", "--output", help="Output directory (default: same as input)" ) parser.add_argument( "-r", "--recursive", action="store_true", help="Recursively process directories" ) parser.add_argument( "--skip-existing", action="store_true", help="Skip existing files" ) parser.add_argument( "-y", "--force-overwrite", action="store_true", help="Force overwrite of existing files without prompting" ) # Format options parser.add_argument( "-f", "--format", choices=list(SUPPORTED_OUTPUT_FORMATS.keys()), help="Output format (default: flac)" ) parser.add_argument( "--metadata", choices=["y", "n"], default="y", help="Preserve metadata: y=yes (default), n=no" ) # Audio quality options parser.add_argument( "-b", "--bitrate", help="Audio bitrate for lossy formats (e.g., 128k, 256k, 320k)" ) parser.add_argument( "-d", "--bit-depth", choices=list(BIT_DEPTH_OPTIONS.keys()), help="Bit depth for lossless formats" ) parser.add_argument( "--sample-rate", choices=["44100", "48000", "96000", "192000", "320000"], help="Sample rate in Hz" ) parser.add_argument( "--channels", choices=["1", "2"], help="Number of audio channels (1=mono, 2=stereo)" ) parser.add_argument( "-q", "--quality", choices=list(QUALITY_PRESETS.keys()), help="Encoding quality preset" ) # Utility options parser.add_argument( "-i", "--info", action="store_true", help="Display information about the input file and exit" ) parser.add_argument( "--logging", choices=["low", "high"], default="low", help="Logging level: low (default) or high (verbose)" ) parser.add_argument( "--list-formats", action="store_true", help="List supported output formats and exit" ) return parser.parse_args()
[docs] def main(): """ Main function for the audio converter. """ args = parse_arguments() # Set logging level if args.logging == "high": logger.setLevel(logging.DEBUG) # List supported formats and exit if args.list_formats: print("Supported output formats:") for fmt, info in SUPPORTED_OUTPUT_FORMATS.items(): print(f" {fmt}: {info['desc']} (.{info['ext']})") return # Prepare conversion options options = DEFAULT_SETTINGS.copy() if args.format: options['output_format'] = args.format if args.metadata: options['metadata'] = args.metadata if args.bitrate: options['bitrate'] = args.bitrate if args.bit_depth: options['bit_depth'] = args.bit_depth if args.sample_rate: options['sample_rate'] = args.sample_rate if args.channels: options['channels'] = args.channels if args.quality: options['quality'] = args.quality if args.recursive: options['recursive'] = True if args.skip_existing: options['skip_existing'] = True if args.force_overwrite: options['force_overwrite'] = True # Create converter try: converter = AudioConverter(options) # Just display file info and exit if args.info: if len(args.input) > 1: logger.warning("Multiple inputs provided with --info, showing info for the first file only") input_path = args.input[0] # Get first input # Handle --type parameter with --info if args.type == "file" or (args.type == "auto" and os.path.isfile(input_path)): converter.display_file_info(input_path) else: logger.error(f"--info option requires a file input, not a directory") return # Process multiple inputs based on type parameter input_files = [] input_dirs = [] # When a single input is provided, args.input is still a list with one item for input_path in args.input: if args.type == "file": if not os.path.isfile(input_path): logger.error(f"Input was specified as a file, but '{input_path}' is not a file") sys.exit(1) input_files.append(input_path) elif args.type == "directory": if not os.path.isdir(input_path): logger.error(f"Input was specified as a directory, but '{input_path}' is not a directory") sys.exit(1) input_dirs.append(input_path) else: # auto-detect if os.path.isfile(input_path): input_files.append(input_path) elif os.path.isdir(input_path): input_dirs.append(input_path) else: logger.error(f"Input path '{input_path}' doesn't exist") sys.exit(1) # Prepare output directory output_dir = args.output if args.output else None # Process all files first if input_files: logger.info(f"Converting {len(input_files)} individual file(s)") success_count = 0 for file_path in input_files: if converter.convert_file(file_path, output_dir): success_count += 1 logger.info(f"File conversion completed: {success_count}/{len(input_files)} files converted successfully") if success_count < len(input_files): sys.exit(1) # Then process all directories total_dir_files = 0 total_dir_success = 0 for dir_path in input_dirs: logger.info(f"Converting files in directory: {dir_path}") dir_success, dir_total = converter.convert_directory(dir_path, output_dir) total_dir_files += dir_total total_dir_success += dir_success if input_dirs: logger.info(f"Directory conversion completed: {total_dir_success}/{total_dir_files} files converted successfully") if total_dir_success < total_dir_files: sys.exit(1) except Exception as e: logger.error(f"Error: {str(e)}") sys.exit(1)
if __name__ == "__main__": main()