#!/usr/bin/env python3
"""
Apply Loudness Tool
Copyright (c) 2025 TAPS OSS
Project: https://github.com/TAPSOSS/Walrio
Licensed under the BSD-3-Clause License (see LICENSE file for details)
A tool to apply gain adjustments directly to audio files using FFmpeg while preserving metadata and album art.
Can apply gain based on ReplayGain values or direct dB adjustments.
"""
import os
import sys
import argparse
import subprocess
import logging
import shutil
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
import tempfile
# Add parent directory to path for module imports
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
from modules.addons.replaygain import ReplayGainAnalyzer
from modules.core import metadata
# Configure logging format
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger('ApplyLoudness')
# Supported audio file extensions
SUPPORTED_EXTENSIONS = {'.flac', '.mp3', '.m4a', '.wav', '.ogg', '.opus'}
[docs]
class LoudnessApplicator:
"""
Audio loudness applicator using FFmpeg for gain adjustment
"""
[docs]
def __init__(self, create_backup: bool = True):
"""
Initialize the loudness applicator.
Args:
create_backup (bool): Whether to create backup files before modification
"""
self.create_backup = create_backup
self.processed_count = 0
self.error_count = 0
self.backup_count = 0
# Validate FFmpeg/FFprobe availability
self._check_ffmpeg()
def _check_ffmpeg(self):
"""
Check if FFmpeg and FFprobe are available.
Raises:
RuntimeError: If FFmpeg or FFprobe is not found.
"""
for tool in ['ffmpeg', 'ffprobe']:
try:
result = subprocess.run(
[tool, '-version'],
capture_output=True,
text=True,
check=True
)
logger.debug(f"{tool} is available")
except (subprocess.CalledProcessError, FileNotFoundError):
raise RuntimeError(
f"{tool} not found. Please install FFmpeg and make sure it's in your PATH."
)
[docs]
def is_supported_file(self, filepath: str) -> bool:
"""
Check if a file is a supported audio file.
Args:
filepath (str): Path to the file
Returns:
bool: True if the file is supported, False otherwise
"""
if not os.path.isfile(filepath):
return False
ext = Path(filepath).suffix.lower()
return ext in SUPPORTED_EXTENSIONS
[docs]
def get_replaygain_value(self, filepath: str, target_lufs: int = -18) -> Optional[float]:
"""
Get ReplayGain value for a file using the ReplayGain analyzer.
Args:
filepath (str): Path to the audio file
target_lufs (int): Target LUFS value for ReplayGain calculation
Returns:
float or None: ReplayGain value in dB, or None if analysis failed
"""
try:
# Create analyzer instance with the target LUFS
analyzer = ReplayGainAnalyzer(target_lufs=target_lufs)
# Analyze the file
result = analyzer.analyze_file(filepath)
if result is None:
logger.error(f"ReplayGain analysis failed for {os.path.basename(filepath)}")
return None
# Extract the gain value
gain_db = result.get('gain_db')
if gain_db is None:
logger.error(f"No gain value found in ReplayGain analysis for {os.path.basename(filepath)}")
return None
if isinstance(gain_db, str):
try:
gain_db = float(gain_db)
except ValueError:
logger.error(f"Invalid gain value in ReplayGain analysis for {os.path.basename(filepath)}: {gain_db}")
return None
logger.debug(f"ReplayGain analysis for {os.path.basename(filepath)}: {result.get('loudness_lufs')} LUFS, {gain_db} dB gain")
return gain_db
except Exception as e:
logger.error(f"Error getting ReplayGain value for {os.path.basename(filepath)}: {str(e)}")
return None
[docs]
def get_audio_properties(self, filepath: str) -> Dict[str, Any]:
"""
Get audio properties from a file using FFprobe.
Args:
filepath (str): Path to the audio file
Returns:
dict: Audio properties including bit depth, sample rate, etc.
"""
try:
cmd = [
"ffprobe", "-v", "error",
"-select_streams", "a:0",
"-show_entries", "stream=bits_per_raw_sample,bits_per_sample,sample_rate,channels",
"-of", "default=noprint_wrappers=1:nokey=1",
str(filepath)
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False
)
if result.returncode != 0:
logger.warning(f"Could not get audio properties for {os.path.basename(filepath)}")
return {}
lines = result.stdout.strip().splitlines()
properties = {}
if len(lines) >= 1 and lines[0].isdigit():
properties['bits_per_raw_sample'] = int(lines[0])
if len(lines) >= 2 and lines[1].isdigit():
properties['bits_per_sample'] = int(lines[1])
if len(lines) >= 3 and lines[2].isdigit():
properties['sample_rate'] = int(lines[2])
if len(lines) >= 4 and lines[3].isdigit():
properties['channels'] = int(lines[3])
return properties
except Exception as e:
logger.warning(f"Error getting audio properties for {os.path.basename(filepath)}: {str(e)}")
return {}
def _has_album_art(self, filepath: str) -> bool:
"""
Check if a file has album art using FFprobe.
Args:
filepath (str): Path to the audio file
Returns:
bool: True if the file has album art, False otherwise
"""
try:
cmd = [
"ffprobe", "-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=codec_type",
"-of", "csv=p=0",
str(filepath)
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False
)
return result.returncode == 0 and "video" in result.stdout.lower()
except Exception:
return False
def _handle_opus_album_art(self, original_filepath: str, opus_filepath: str):
"""
Handle album art embedding for Opus files using the centralized metadata module.
Args:
original_filepath (str): Path to the original file with album art
opus_filepath (str): Path to the converted Opus file
"""
if not self._has_album_art(original_filepath):
logger.debug(f"No album art detected in {os.path.basename(original_filepath)}")
return
logger.info("Extracting and embedding album art for Opus file using metadata module")
# Extract album art using FFmpeg first
temp_art_file = f"{opus_filepath}.albumart.jpg"
try:
# Extract album art using FFmpeg
art_cmd = [
'ffmpeg', '-y', '-i', str(original_filepath),
'-an', '-vcodec', 'copy',
'-map', '0:v:0', # Map first video stream (album art)
temp_art_file
]
logger.debug(f"Extracting album art to {temp_art_file}")
art_process = subprocess.run(
art_cmd,
capture_output=True,
text=True,
check=False,
timeout=60 # Set a 1-minute timeout for art extraction
)
if art_process.returncode == 0 and os.path.exists(temp_art_file):
logger.debug("Album art extracted successfully")
# Use the centralized metadata module to embed album art
if metadata.set_album_art(opus_filepath, temp_art_file):
logger.debug("Successfully embedded album art in Opus file using metadata module")
else:
logger.warning("Failed to embed album art using metadata module")
else:
logger.warning(f"Failed to extract album art: {art_process.stderr}")
except subprocess.TimeoutExpired:
logger.error("Album art processing timed out")
except Exception as e:
logger.warning(f"Error during album art processing: {str(e)}")
finally:
# Clean up temporary art file
if os.path.exists(temp_art_file):
try:
os.remove(temp_art_file)
except:
pass
[docs]
def apply_gain_to_file(self, filepath: str, gain_db: float, output_dir: Optional[str] = None) -> bool:
"""
Apply gain to a single audio file using FFmpeg while preserving metadata and album art.
Args:
filepath (str): Path to the audio file
gain_db (float): Gain to apply in dB
output_dir (str, optional): Output directory for modified files (None for in-place)
Returns:
bool: True if successful, False otherwise
"""
if not self.is_supported_file(filepath):
logger.warning(f"Unsupported file type: {os.path.basename(filepath)}")
return False
if abs(gain_db) < 0.01: # Effectively no change
logger.info(f"Skipping {os.path.basename(filepath)} - no significant gain change needed ({gain_db:.2f} dB)")
return True
try:
file_path = Path(filepath)
ext = file_path.suffix.lower()
# Determine output file path
if output_dir:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
out_file = output_path / file_path.name
else:
out_file = file_path
# Create backup if requested and modifying in-place
if self.create_backup and not output_dir:
backup_file = file_path.with_suffix(f"{ext}.backup")
if not backup_file.exists():
shutil.copy2(filepath, backup_file)
self.backup_count += 1
logger.debug(f"Created backup: {backup_file}")
# Create temporary file for processing
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as temp_file:
temp_path = temp_file.name
try:
# Get audio properties for format-specific encoding
audio_props = self.get_audio_properties(filepath)
# Build FFmpeg command
ffmpeg_cmd = [
"ffmpeg", "-y", "-i", str(filepath),
"-map_metadata", "0", # Preserve all metadata
"-af", f"volume={gain_db}dB", # Apply gain
]
# Special handling for Opus format which needs specific treatment for album art
if ext == ".opus":
# For Opus, we only map audio stream in the initial command
# Album art will be handled separately after audio conversion
ffmpeg_cmd.extend(["-map", "0:a:0"]) # Only map first audio stream
else:
# For other formats with album art support
ffmpeg_cmd.extend(["-map", "0"]) # Map all streams (audio + video/images)
ffmpeg_cmd.extend(["-c:v", "copy"]) # Copy video/image streams (album art) without re-encoding
# Format-specific audio encoding settings
if ext == ".mp3":
ffmpeg_cmd += ["-c:a", "libmp3lame"]
elif ext == ".flac":
ffmpeg_cmd += ["-c:a", "flac"]
# Preserve bit depth for FLAC
if 'bits_per_raw_sample' in audio_props or 'bits_per_sample' in audio_props:
bit_depth = audio_props.get('bits_per_raw_sample', audio_props.get('bits_per_sample', 16))
if bit_depth == 16:
ffmpeg_cmd += ["-sample_fmt", "s16"]
elif bit_depth == 24:
ffmpeg_cmd += ["-sample_fmt", "s32"]
elif bit_depth == 32:
ffmpeg_cmd += ["-sample_fmt", "s32"]
elif ext == ".m4a":
ffmpeg_cmd += ["-c:a", "aac"]
elif ext == ".ogg":
ffmpeg_cmd += ["-c:a", "libvorbis"]
elif ext == ".opus":
ffmpeg_cmd += ["-c:a", "libopus"]
elif ext == ".wav":
# For WAV, preserve the original sample format
if 'bits_per_sample' in audio_props:
bit_depth = audio_props['bits_per_sample']
if bit_depth == 16:
ffmpeg_cmd += ["-c:a", "pcm_s16le"]
elif bit_depth == 24:
ffmpeg_cmd += ["-c:a", "pcm_s24le"]
elif bit_depth == 32:
ffmpeg_cmd += ["-c:a", "pcm_s32le"]
else:
ffmpeg_cmd += ["-c:a", "pcm_s16le"] # Default fallback
else:
ffmpeg_cmd += ["-c:a", "pcm_s16le"] # Default fallback
ffmpeg_cmd.append(temp_path)
# Execute FFmpeg command
logger.debug(f"Running FFmpeg command: {' '.join(ffmpeg_cmd)}")
result = subprocess.run(
ffmpeg_cmd,
capture_output=True,
text=False, # Use binary mode to avoid encoding issues
check=False
)
if result.returncode != 0:
# Decode error output safely
try:
stderr = result.stderr.decode('utf-8', errors='replace') if result.stderr else ""
stdout = result.stdout.decode('utf-8', errors='replace') if result.stdout else ""
except:
stderr = str(result.stderr) if result.stderr else ""
stdout = str(result.stdout) if result.stdout else ""
logger.error(f"FFmpeg failed for {os.path.basename(filepath)}: {stderr or stdout}")
return False
# Special handling for Opus files with album art
if ext == ".opus":
self._handle_opus_album_art(filepath, temp_path)
# Move temporary file to final location
shutil.move(temp_path, str(out_file))
self.processed_count += 1
logger.info(f"Applied {gain_db:+.2f} dB gain to {os.path.basename(filepath)}")
return True
finally:
# Clean up temporary file if it still exists
if os.path.exists(temp_path):
try:
os.unlink(temp_path)
except:
pass
except Exception as e:
logger.error(f"Error applying gain to {os.path.basename(filepath)}: {str(e)}")
self.error_count += 1
return False
[docs]
def process_files(self, file_paths: List[str], gain_db: Optional[float] = None,
use_replaygain: bool = False, target_lufs: int = -18,
output_dir: Optional[str] = None) -> Tuple[int, int]:
"""
Process multiple audio files to apply gain.
Args:
file_paths (list): List of file paths to process
gain_db (float, optional): Fixed gain to apply in dB
use_replaygain (bool): Whether to use ReplayGain values instead of fixed gain
target_lufs (int): Target LUFS for ReplayGain calculation
output_dir (str, optional): Output directory for modified files
Returns:
tuple: (successful_count, total_count)
"""
if not file_paths:
logger.warning("No files to process")
return (0, 0)
if not use_replaygain and gain_db is None:
logger.error("Either gain_db must be specified or use_replaygain must be True")
return (0, 0)
# Filter supported files
supported_files = [f for f in file_paths if self.is_supported_file(f)]
unsupported_count = len(file_paths) - len(supported_files)
if unsupported_count > 0:
logger.warning(f"Skipping {unsupported_count} unsupported files")
if not supported_files:
logger.error("No supported audio files to process")
return (0, 0)
logger.info(f"Processing {len(supported_files)} audio files")
successful_count = 0
for i, filepath in enumerate(supported_files, 1):
logger.debug(f"Processing file {i}/{len(supported_files)}: {os.path.basename(filepath)}")
# Determine gain to apply
if use_replaygain:
file_gain = self.get_replaygain_value(filepath, target_lufs)
if file_gain is None:
logger.error(f"Could not get ReplayGain value for {os.path.basename(filepath)}, skipping")
continue
else:
file_gain = gain_db
# Apply gain
if self.apply_gain_to_file(filepath, file_gain, output_dir):
successful_count += 1
return (successful_count, len(supported_files))
[docs]
def process_directory(self, directory: str, recursive: bool = True,
gain_db: Optional[float] = None, use_replaygain: bool = False,
target_lufs: int = -18, output_dir: Optional[str] = None) -> Tuple[int, int]:
"""
Process all supported audio files in a directory.
Args:
directory (str): Directory to process
recursive (bool): Whether to process subdirectories recursively
gain_db (float, optional): Fixed gain to apply in dB
use_replaygain (bool): Whether to use ReplayGain values instead of fixed gain
target_lufs (int): Target LUFS for ReplayGain calculation
output_dir (str, optional): Output directory for modified files
Returns:
tuple: (successful_count, total_count)
"""
if not os.path.isdir(directory):
logger.error(f"Directory does not exist: {directory}")
return (0, 0)
# Find all supported audio files
file_paths = []
if recursive:
for root, _, files in os.walk(directory):
for file in files:
filepath = os.path.join(root, file)
if self.is_supported_file(filepath):
file_paths.append(filepath)
else:
for file in os.listdir(directory):
filepath = os.path.join(directory, file)
if self.is_supported_file(filepath):
file_paths.append(filepath)
return self.process_files(file_paths, gain_db, use_replaygain, target_lufs, output_dir)
[docs]
def parse_arguments():
"""
Parse command line arguments.
Returns:
argparse.Namespace: Parsed arguments
"""
parser = argparse.ArgumentParser(
description="Apply Loudness Tool - Apply gain adjustments to audio files using FFmpeg",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""Examples:
# Apply fixed +3 dB gain to all files in a directory
python applyloudness.py /path/to/music --gain +3
# Apply -2 dB gain to specific files
python applyloudness.py song1.mp3 song2.flac --gain -2
# Use ReplayGain values with default -18 LUFS target
python applyloudness.py /path/to/music --replaygain
# Use ReplayGain with custom LUFS target (Apple Music standard)
python applyloudness.py /path/to/music --replaygain --target-lufs -16
# Process files and save to output directory (preserve originals)
python applyloudness.py /path/to/music --gain +1.5 --output /path/to/output
# Apply gain without creating backup files
python applyloudness.py /path/to/music --gain -1 --backup false
# Dry run to see what would be processed
python applyloudness.py /path/to/music --gain +2 --dry-run
Supported file formats:
- FLAC (.flac)
- MP3 (.mp3)
- M4A (.m4a)
- WAV (.wav)
- OGG (.ogg)
- Opus (.opus)
LUFS Target Values (for --replaygain mode):
-18 LUFS: ReplayGain 2.0 standard (default)
-16 LUFS: Apple Music standard
-14 LUFS: Spotify, Amazon Music, YouTube standard
-20 LUFS: TV broadcast standard
Requirements:
- FFmpeg (for audio processing)
- rsgain (for ReplayGain mode)
""")
# Input options
parser.add_argument(
"input",
nargs="+",
help="Audio files or directories to process"
)
# Gain mode options (mutually exclusive)
gain_group = parser.add_mutually_exclusive_group(required=True)
gain_group.add_argument(
"--gain",
type=float,
help="Fixed gain to apply in dB (e.g., +3, -2.5)"
)
gain_group.add_argument(
"--replaygain",
action="store_true",
help="Use ReplayGain values calculated from file analysis"
)
# ReplayGain options
parser.add_argument(
"--target-lufs", "--lufs",
type=int,
default=-18,
help="Target LUFS value for ReplayGain calculation (default: -18)"
)
# Processing options
parser.add_argument(
"--output", "-o",
metavar="DIR",
help="Output directory for processed files (default: modify files in-place)"
)
parser.add_argument(
"--recursive", "-r",
action="store_true",
default=False,
help="Process directories recursively (default: False)"
)
parser.add_argument(
"--backup",
choices=['true', 'false'],
default='true',
help="Create backup files when modifying in-place (default: true)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be processed without actually modifying files"
)
# Output options
parser.add_argument(
"--quiet", "-q",
action="store_true",
help="Suppress progress messages, only show summary"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Show detailed processing information"
)
# Utility options
parser.add_argument(
"--check-deps",
action="store_true",
help="Check if required dependencies are available and exit"
)
return parser.parse_args()
[docs]
def main():
"""
Main function for the apply loudness tool.
"""
args = parse_arguments()
# Set logging level
if args.quiet:
logger.setLevel(logging.WARNING)
elif args.verbose:
logger.setLevel(logging.DEBUG)
# Handle dependency check
if args.check_deps:
try:
applicator = LoudnessApplicator()
print("FFmpeg and FFprobe are available.")
if args.replaygain:
try:
subprocess.run(['rsgain', '--version'], capture_output=True, check=True)
print("rsgain is available.")
except:
print("Warning: rsgain is not available (required for --replaygain mode)")
sys.exit(1)
sys.exit(0)
except RuntimeError as e:
print(f"Error: {e}")
sys.exit(1)
# Validate arguments
if args.replaygain and not (-30 <= args.target_lufs <= -5):
logger.warning(f"Target LUFS {args.target_lufs} is outside typical range (-30 to -5)")
if args.gain is not None and abs(args.gain) > 20:
logger.warning(f"Large gain adjustment ({args.gain} dB) may cause severe distortion or clipping")
# Resolve settings
recursive = args.recursive
create_backup = args.backup == 'true' and not args.output # No backup needed if using output dir
try:
# Create applicator
applicator = LoudnessApplicator(create_backup=create_backup)
if args.dry_run:
logger.info("DRY RUN MODE - No files will be modified")
else:
# Warning for destructive operations
print("\n" + "="*60)
print("WARNING: DESTRUCTIVE OPERATION")
print("="*60)
print("Applying gain directly to audio files can permanently damage them")
print("and may cause irreversible audio quality loss or clipping.")
print("")
print("This operation will modify your audio files directly.")
if not create_backup:
print("Backup creation is DISABLED - original files will be lost!")
else:
print("Backup files will be created (.backup extension)")
print("")
print("Are you absolutely sure you want to continue?")
print("="*60)
# Get user confirmation
while True:
response = input("Type 'y' to confirm or 'n' to cancel: ").strip().lower()
if response == 'y':
print("Proceeding with gain application...")
break
elif response == 'n':
print("Operation cancelled by user.")
sys.exit(0)
else:
print("Please enter 'y' to confirm or 'n' to cancel.")
# Process all inputs
total_successful = 0
total_processed = 0
for input_path in args.input:
if not os.path.exists(input_path):
logger.error(f"Input does not exist: {input_path}")
continue
if os.path.isfile(input_path):
# Single file
if not applicator.is_supported_file(input_path):
logger.warning(f"Unsupported file: {input_path}")
continue
logger.info(f"Processing file: {os.path.basename(input_path)}")
if args.dry_run:
if args.replaygain:
gain_value = applicator.get_replaygain_value(input_path, args.target_lufs)
if gain_value is not None:
logger.info(f"Would apply {gain_value:+.2f} dB gain (ReplayGain) to {os.path.basename(input_path)}")
else:
logger.warning(f"Could not determine ReplayGain for {os.path.basename(input_path)}")
else:
logger.info(f"Would apply {args.gain:+.2f} dB gain to {os.path.basename(input_path)}")
total_successful += 1
total_processed += 1
else:
successful, processed = applicator.process_files(
[input_path], args.gain, args.replaygain, args.target_lufs, args.output
)
total_successful += successful
total_processed += processed
elif os.path.isdir(input_path):
# Directory
logger.info(f"Processing directory: {input_path}")
if args.dry_run:
# For dry run, just count files
file_count = 0
if recursive:
for root, _, files in os.walk(input_path):
for file in files:
filepath = os.path.join(root, file)
if applicator.is_supported_file(filepath):
file_count += 1
else:
for file in os.listdir(input_path):
filepath = os.path.join(input_path, file)
if applicator.is_supported_file(filepath):
file_count += 1
if args.replaygain:
logger.info(f"Would analyze and apply ReplayGain to {file_count} files")
else:
logger.info(f"Would apply {args.gain:+.2f} dB gain to {file_count} files")
total_successful += file_count
total_processed += file_count
else:
successful, processed = applicator.process_directory(
input_path, recursive, args.gain, args.replaygain, args.target_lufs, args.output
)
total_successful += successful
total_processed += processed
else:
logger.error(f"Input is neither file nor directory: {input_path}")
# Print summary
if args.dry_run:
logger.info(f"Dry run completed: {total_successful}/{total_processed} files would be processed")
else:
logger.info(f"Processing completed: {total_successful}/{total_processed} files processed successfully")
if applicator.backup_count > 0:
logger.info(f"Created {applicator.backup_count} backup files")
if applicator.error_count > 0:
logger.error(f"Encountered {applicator.error_count} errors during processing")
# Exit with error code if there were failures
if total_processed > 0 and total_successful < total_processed:
sys.exit(1)
except Exception as e:
logger.error(f"Error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()