#!/usr/bin/env python3
"""
ReplayGain LUFS Analyzer
Copyright (c) 2025 TAPS OSS
Project: https://github.com/TAPSOSS/Walrio
Licensed under the BSD-3-Clause License (see LICENSE file for details)
A tool to analyze audio files for ReplayGain values using LUFS (Loudness Units relative to Full Scale).
Uses rsgain tool for analysis and can optionally apply ReplayGain tags to files.
This implementation is inspired by the MuseAmp project by tapscodes.
"""
import os
import sys
import argparse
import subprocess
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
import json
# Configure logging format
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger('ReplayGainAnalyzer')
# Supported audio file extensions
SUPPORTED_EXTENSIONS = {'.flac', '.mp3', '.m4a'}
# Default LUFS target (ReplayGain 2.0 standard)
DEFAULT_TARGET_LUFS = -18
[docs]
class ReplayGainAnalyzer:
"""
Audio ReplayGain analyzer using rsgain for LUFS analysis
"""
[docs]
def __init__(self, target_lufs: int = DEFAULT_TARGET_LUFS):
"""
Initialize the ReplayGain analyzer.
Args:
target_lufs (int): Target LUFS value for analysis (default: -18)
"""
self.target_lufs = target_lufs
self.analyzed_count = 0
self.error_count = 0
self.tagged_count = 0
# Validate rsgain availability
self._check_rsgain()
def _check_rsgain(self):
"""
Check if rsgain is available for analysis.
Raises:
RuntimeError: If rsgain is not found.
"""
try:
result = subprocess.run(
['rsgain', '--version'],
capture_output=True,
text=True,
check=True
)
logger.debug("rsgain is available for ReplayGain analysis")
except (subprocess.CalledProcessError, FileNotFoundError):
raise RuntimeError(
"rsgain not found. Please install rsgain from https://github.com/complexlogic/rsgain and make sure it's in your PATH."
)
[docs]
def is_supported_file(self, filepath: str) -> bool:
"""
Check if a file is a supported audio file.
Args:
filepath (str): Path to the file
Returns:
bool: True if the file is supported, False otherwise
"""
if not os.path.isfile(filepath):
return False
ext = Path(filepath).suffix.lower()
return ext in SUPPORTED_EXTENSIONS
[docs]
def analyze_file(self, filepath: str) -> Optional[Dict[str, Any]]:
"""
Analyze a single audio file for ReplayGain values using rsgain.
Args:
filepath (str): Path to the audio file
Returns:
dict or None: Analysis results containing loudness, gain, and clipping info, or None if analysis failed
"""
if not self.is_supported_file(filepath):
logger.warning(f"Unsupported file type: {os.path.basename(filepath)}")
return None
try:
# Use rsgain custom command for analysis without writing tags
cmd = [
"rsgain", "custom",
"-O", # Output format: tab-separated values
str(filepath)
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False
)
if result.returncode != 0:
logger.error(f"rsgain analysis failed for {os.path.basename(filepath)}: {result.stderr or result.stdout}")
self.error_count += 1
return None
# Parse the output
lines = result.stdout.strip().splitlines()
if len(lines) < 2:
logger.error(f"Unexpected rsgain output format for {os.path.basename(filepath)}")
self.error_count += 1
return None
# Parse header and values
header = lines[0].split('\t')
values = lines[1].split('\t')
if len(header) != len(values):
logger.error(f"Header/value mismatch in rsgain output for {os.path.basename(filepath)}")
self.error_count += 1
return None
# Create column mapping
colmap = {k: i for i, k in enumerate(header)}
# Extract values
analysis_result = {
'filepath': filepath,
'filename': os.path.basename(filepath),
'loudness_lufs': None,
'gain_db': None,
'clipping': None,
'raw_output': result.stdout.strip()
}
# Get loudness value
lufs_col = colmap.get("Loudness (LUFS)", -1)
if lufs_col != -1 and lufs_col < len(values):
try:
analysis_result['loudness_lufs'] = float(values[lufs_col])
except ValueError:
analysis_result['loudness_lufs'] = values[lufs_col] # Keep as string if not numeric
# Get gain value
gain_col = colmap.get("Gain (dB)", -1)
if gain_col != -1 and gain_col < len(values):
try:
analysis_result['gain_db'] = float(values[gain_col])
except ValueError:
analysis_result['gain_db'] = values[gain_col] # Keep as string if not numeric
# Get clipping information
clip_col = colmap.get("Clipping", colmap.get("Clipping Adjustment?", -1))
if clip_col != -1 and clip_col < len(values):
clip_val = values[clip_col].strip().upper()
if clip_val in ("Y", "YES"):
analysis_result['clipping'] = True
elif clip_val in ("N", "NO"):
analysis_result['clipping'] = False
else:
analysis_result['clipping'] = values[clip_col] # Keep original value
self.analyzed_count += 1
logger.debug(f"Analyzed {os.path.basename(filepath)}: {analysis_result['loudness_lufs']} LUFS, {analysis_result['gain_db']} dB gain")
return analysis_result
except Exception as e:
logger.error(f"Error analyzing {os.path.basename(filepath)}: {str(e)}")
self.error_count += 1
return None
[docs]
def analyze_and_tag_file(self, filepath: str, skip_tagged: bool = True) -> Optional[Dict[str, Any]]:
"""
Analyze a file and optionally apply ReplayGain tags.
Args:
filepath (str): Path to the audio file
skip_tagged (bool): If True, skip files that already have ReplayGain tags
Returns:
dict or None: Analysis results, or None if analysis failed
"""
if not self.is_supported_file(filepath):
logger.warning(f"Unsupported file type: {os.path.basename(filepath)}")
return None
try:
# Build rsgain command for analysis and tagging (following MuseAmp pattern)
lufs_str = f"-{abs(self.target_lufs)}" # MuseAmp pattern: always negative
cmd = [
"rsgain", "custom",
"-s", "i", # Apply tags (single file mode, integrated mode)
"-l", lufs_str, # Target LUFS
"-O", # Output format: tab-separated values
str(filepath)
]
# Add skip option if requested
if skip_tagged:
cmd.insert(2, "-S") # Skip files with existing ReplayGain tags
logger.debug(f"Running command: {' '.join(cmd)}") # Debug the exact command
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False
)
if result.returncode != 0:
logger.error(f"rsgain tagging failed for {os.path.basename(filepath)}: {result.stderr or result.stdout}")
self.error_count += 1
return None
# Parse the output (same format as analyze_file)
lines = result.stdout.strip().splitlines()
if len(lines) < 2:
logger.error(f"Unexpected rsgain output format for {os.path.basename(filepath)}")
self.error_count += 1
return None
# Parse header and values
header = lines[0].split('\t')
values = lines[1].split('\t')
logger.debug(f"Header: {header}") # Debug output
logger.debug(f"Values: {values}") # Debug output
if len(header) != len(values):
logger.error(f"Header/value mismatch in rsgain output for {os.path.basename(filepath)}")
self.error_count += 1
return None
# Create column mapping
colmap = {k: i for i, k in enumerate(header)}
# Extract values
analysis_result = {
'filepath': filepath,
'filename': os.path.basename(filepath),
'loudness_lufs': None,
'gain_db': None,
'clipping': None,
'tagged': True,
'target_lufs': self.target_lufs,
'raw_output': result.stdout.strip()
}
# Get loudness value
lufs_col = colmap.get("Loudness (LUFS)", -1)
if lufs_col != -1 and lufs_col < len(values):
try:
analysis_result['loudness_lufs'] = float(values[lufs_col])
except ValueError:
analysis_result['loudness_lufs'] = values[lufs_col]
# Get gain value
gain_col = colmap.get("Gain (dB)", -1)
if gain_col != -1 and gain_col < len(values):
try:
analysis_result['gain_db'] = float(values[gain_col])
except ValueError:
analysis_result['gain_db'] = values[gain_col]
# Get clipping information
clip_col = colmap.get("Clipping", colmap.get("Clipping Adjustment?", -1))
if clip_col != -1 and clip_col < len(values):
clip_val = values[clip_col].strip().upper()
if clip_val in ("Y", "YES"):
analysis_result['clipping'] = True
elif clip_val in ("N", "NO"):
analysis_result['clipping'] = False
else:
analysis_result['clipping'] = values[clip_col]
self.analyzed_count += 1
self.tagged_count += 1
logger.info(f"Tagged {os.path.basename(filepath)}: {analysis_result['loudness_lufs']} LUFS, {analysis_result['gain_db']} dB gain")
return analysis_result
except Exception as e:
logger.error(f"Error analyzing and tagging {os.path.basename(filepath)}: {str(e)}")
self.error_count += 1
return None
[docs]
def analyze_directory(self, directory: str, recursive: bool = True, analyze_only: bool = True) -> List[Dict[str, Any]]:
"""
Analyze all supported audio files in a directory.
Args:
directory (str): Directory to analyze
recursive (bool): If True, process subdirectories recursively
analyze_only (bool): If True, only analyze without tagging
Returns:
list: List of analysis results for all processed files
"""
if not os.path.isdir(directory):
logger.error(f"Directory does not exist: {directory}")
return []
# Find all supported audio files
files_to_process = []
if recursive:
for root, _, files in os.walk(directory):
for file in files:
filepath = os.path.join(root, file)
if self.is_supported_file(filepath):
files_to_process.append(filepath)
else:
for file in os.listdir(directory):
filepath = os.path.join(directory, file)
if self.is_supported_file(filepath):
files_to_process.append(filepath)
if not files_to_process:
logger.warning(f"No supported audio files found in {directory}")
return []
logger.info(f"Found {len(files_to_process)} supported audio files")
# Process all files
results = []
for i, filepath in enumerate(files_to_process, 1):
logger.debug(f"Processing file {i}/{len(files_to_process)}: {os.path.basename(filepath)}")
if analyze_only:
result = self.analyze_file(filepath)
else:
result = self.analyze_and_tag_file(filepath)
if result:
results.append(result)
return results
[docs]
def print_analysis_summary(self, results: List[Dict[str, Any]], detailed: bool = False):
"""
Print a summary of analysis results.
Args:
results (list): List of analysis results
detailed (bool): If True, print detailed per-file results
"""
if not results:
print("No analysis results to display.")
return
print(f"\nReplayGain Analysis Summary")
print("=" * 60)
print(f"Target LUFS: {self.target_lufs}")
print(f"Files analyzed: {len(results)}")
print(f"Files with errors: {self.error_count}")
# Calculate statistics
valid_loudness = [r['loudness_lufs'] for r in results if isinstance(r['loudness_lufs'], (int, float))]
valid_gain = [r['gain_db'] for r in results if isinstance(r['gain_db'], (int, float))]
clipping_files = [r for r in results if r['clipping'] is True]
if valid_loudness:
avg_loudness = sum(valid_loudness) / len(valid_loudness)
min_loudness = min(valid_loudness)
max_loudness = max(valid_loudness)
print(f"Average loudness: {avg_loudness:.2f} LUFS")
print(f"Loudness range: {min_loudness:.2f} to {max_loudness:.2f} LUFS")
if valid_gain:
avg_gain = sum(valid_gain) / len(valid_gain)
min_gain = min(valid_gain)
max_gain = max(valid_gain)
print(f"Average gain: {avg_gain:.2f} dB")
print(f"Gain range: {min_gain:.2f} to {max_gain:.2f} dB")
if clipping_files:
print(f"Files with clipping: {len(clipping_files)}")
# Detailed per-file results
if detailed:
print(f"\nDetailed Results:")
print("-" * 60)
for result in results:
filename = result['filename']
loudness = result.get('loudness_lufs', 'N/A')
gain = result.get('gain_db', 'N/A')
clipping = result.get('clipping', 'N/A')
# Format values
if isinstance(loudness, (int, float)):
loudness_str = f"{loudness:.2f} LUFS"
else:
loudness_str = str(loudness)
if isinstance(gain, (int, float)):
gain_str = f"{gain:.2f} dB"
else:
gain_str = str(gain)
if isinstance(clipping, bool):
clipping_str = "Yes" if clipping else "No"
else:
clipping_str = str(clipping)
tagged_str = " [TAGGED]" if result.get('tagged', False) else ""
print(f"{filename:<40} {loudness_str:<12} {gain_str:<10} Clipping: {clipping_str}{tagged_str}")
[docs]
def parse_arguments():
"""
Parse command line arguments.
Returns:
argparse.Namespace: Parsed arguments
"""
parser = argparse.ArgumentParser(
description="ReplayGain LUFS Analyzer - Analyze and tag audio files with ReplayGain values",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""Examples:
# Analyze files without tagging (dry run)
python replaygain.py /path/to/music --analyze-only
# Analyze and tag files with default -18 LUFS target
python replaygain.py /path/to/music --tag
# Use custom LUFS target
python replaygain.py /path/to/music --tag --target-lufs -16
# Analyze specific files
python replaygain.py song1.flac song2.mp3 --analyze-only
# Show detailed per-file results
python replaygain.py /path/to/music --analyze-only --detailed
# Tag files but skip those already tagged
python replaygain.py /path/to/music --tag --skip-tagged
# Force retag all files (default behavior - don't use --skip-tagged)
python replaygain.py /path/to/music --tag
Supported file formats:
- FLAC (.flac)
- MP3 (.mp3)
- M4A (.m4a)
LUFS Target Values:
-18 LUFS: ReplayGain 2.0 standard (default)
-16 LUFS: Apple Music standard
-14 LUFS: Spotify, Amazon Music, YouTube standard
-20 LUFS: TV broadcast standard
Requirements:
- rsgain tool (https://github.com/complexlogic/rsgain)
""")
# Input options
parser.add_argument(
"input",
nargs="+",
help="Audio files or directories to analyze"
)
# Operation mode
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument(
"--analyze-only", "--analyze",
action="store_true",
help="Only analyze files without applying ReplayGain tags"
)
mode_group.add_argument(
"--tag",
action="store_true",
help="Analyze files and apply ReplayGain tags"
)
# Analysis options
parser.add_argument(
"--target-lufs", "--lufs",
type=int,
default=DEFAULT_TARGET_LUFS,
help=f"Target LUFS value for ReplayGain calculation (default: {DEFAULT_TARGET_LUFS})"
)
parser.add_argument(
"--skip-tagged",
action="store_true",
default=False,
help="Skip files that already have ReplayGain tags (default: False - process all files)"
)
# Directory processing options
parser.add_argument(
"--recursive", "-r",
action="store_true",
default=False,
help="Process directories recursively (default: False)"
)
# Output options
parser.add_argument(
"--detailed", "--verbose",
action="store_true",
help="Show detailed per-file analysis results"
)
parser.add_argument(
"--quiet", "-q",
action="store_true",
help="Suppress progress messages, only show summary"
)
parser.add_argument(
"--json",
metavar="FILE",
help="Save analysis results to JSON file"
)
# Utility options
parser.add_argument(
"--check-rsgain",
action="store_true",
help="Check if rsgain is available and exit"
)
return parser.parse_args()
[docs]
def main():
"""
Main function for the ReplayGain analyzer.
"""
args = parse_arguments()
# Set logging level
if args.quiet:
logger.setLevel(logging.WARNING)
# Handle rsgain check
if args.check_rsgain:
try:
analyzer = ReplayGainAnalyzer()
print("rsgain is available and ready for use.")
sys.exit(0)
except RuntimeError as e:
print(f"Error: {e}")
sys.exit(1)
# Use skip-tagged setting directly
skip_tagged = args.skip_tagged
# Use recursive setting directly
recursive = args.recursive
# Validate target LUFS range
if not (-30 <= args.target_lufs <= -5):
logger.warning(f"Target LUFS {args.target_lufs} is outside typical range (-30 to -5). This may not be optimal.")
try:
# Create analyzer
analyzer = ReplayGainAnalyzer(target_lufs=args.target_lufs)
# Process all inputs
all_results = []
for input_path in args.input:
if not os.path.exists(input_path):
logger.error(f"Input does not exist: {input_path}")
continue
if os.path.isfile(input_path):
# Single file
if not analyzer.is_supported_file(input_path):
logger.warning(f"Unsupported file: {input_path}")
continue
logger.info(f"Processing file: {os.path.basename(input_path)}")
if args.analyze_only:
result = analyzer.analyze_file(input_path)
else:
result = analyzer.analyze_and_tag_file(input_path, skip_tagged)
if result:
all_results.append(result)
elif os.path.isdir(input_path):
# Directory
logger.info(f"Processing directory: {input_path}")
if args.analyze_only:
# Use the analyze_directory method for analysis-only mode
results = analyzer.analyze_directory(input_path, recursive, args.analyze_only)
else:
# For tagging mode, process each file individually with proper settings
results = []
for root, _, files in os.walk(input_path) if recursive else [(input_path, [], os.listdir(input_path))]:
for file in files:
filepath = os.path.join(root, file)
if analyzer.is_supported_file(filepath):
result = analyzer.analyze_and_tag_file(filepath, skip_tagged)
if result:
results.append(result)
all_results.extend(results)
else:
logger.error(f"Input is neither file nor directory: {input_path}")
# Print summary
if not args.quiet:
analyzer.print_analysis_summary(all_results, args.detailed)
# Save JSON results if requested
if args.json:
try:
with open(args.json, 'w') as f:
json.dump(all_results, f, indent=2, default=str)
logger.info(f"Analysis results saved to: {args.json}")
except Exception as e:
logger.error(f"Failed to save JSON results: {e}")
# Exit with error code if there were errors
if analyzer.error_count > 0:
logger.error(f"Analysis completed with {analyzer.error_count} errors")
sys.exit(1)
# Success summary
operation = "analyzed" if args.analyze_only else "analyzed and tagged"
logger.info(f"Successfully {operation} {analyzer.analyzed_count} files")
if not args.analyze_only:
logger.info(f"Applied ReplayGain tags to {analyzer.tagged_count} files")
except Exception as e:
logger.error(f"Error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()