mirror of
https://github.com/YuzuZensai/spleeter.git
synced 2026-01-31 14:58:23 +00:00
🎨 finalizes audio package
This commit is contained in:
@@ -8,76 +8,92 @@
|
||||
used within this library.
|
||||
"""
|
||||
|
||||
import datetime as dt
|
||||
import os
|
||||
import shutil
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional, Union
|
||||
|
||||
from . import Codec
|
||||
from .adapter import AudioAdapter
|
||||
from .. import SpleeterError
|
||||
from ..types import Signal
|
||||
from ..utils.logging import get_logger
|
||||
|
||||
# pyright: reportMissingImports=false
|
||||
# pylint: disable=import-error
|
||||
import ffmpeg
|
||||
import numpy as np
|
||||
# pylint: enable=import-error
|
||||
|
||||
from .adapter import AudioAdapter
|
||||
from .. import SpleeterError
|
||||
from ..utils.logging import get_logger
|
||||
|
||||
__email__ = 'spleeter@deezer.com'
|
||||
__author__ = 'Deezer Research'
|
||||
__license__ = 'MIT License'
|
||||
|
||||
|
||||
def _check_ffmpeg_install():
|
||||
""" Ensure FFMPEG binaries are available.
|
||||
|
||||
:raise SpleeterError: If ffmpeg or ffprobe is not found.
|
||||
"""
|
||||
for binary in ('ffmpeg', 'ffprobe'):
|
||||
if shutil.which(binary) is None:
|
||||
raise SpleeterError('{} binary not found'.format(binary))
|
||||
|
||||
|
||||
def _to_ffmpeg_time(n):
|
||||
""" Format number of seconds to time expected by FFMPEG.
|
||||
:param n: Time in seconds to format.
|
||||
:returns: Formatted time in FFMPEG format.
|
||||
"""
|
||||
m, s = divmod(n, 60)
|
||||
h, m = divmod(m, 60)
|
||||
return '%d:%02d:%09.6f' % (h, m, s)
|
||||
|
||||
|
||||
def _to_ffmpeg_codec(codec):
|
||||
ffmpeg_codecs = {
|
||||
'm4a': 'aac',
|
||||
'ogg': 'libvorbis',
|
||||
'wma': 'wmav2',
|
||||
}
|
||||
return ffmpeg_codecs.get(codec) or codec
|
||||
|
||||
|
||||
class FFMPEGProcessAudioAdapter(AudioAdapter):
|
||||
""" An AudioAdapter implementation that use FFMPEG binary through
|
||||
subprocess in order to perform I/O operation for audio processing.
|
||||
|
||||
When created, FFMPEG binary path will be checked and expended,
|
||||
raising exception if not found. Such path could be infered using
|
||||
FFMPEG_PATH environment variable.
|
||||
"""
|
||||
An AudioAdapter implementation that use FFMPEG binary through
|
||||
subprocess in order to perform I/O operation for audio processing.
|
||||
|
||||
When created, FFMPEG binary path will be checked and expended,
|
||||
raising exception if not found. Such path could be infered using
|
||||
`FFMPEG_PATH` environment variable.
|
||||
"""
|
||||
|
||||
SUPPORTED_CODECS: Dict[Codec, str] = {
|
||||
Codec.M4A: 'aac',
|
||||
Codec.OGG: 'libvorbis',
|
||||
Codec.WMA: 'wmav2'
|
||||
}
|
||||
""" FFMPEG codec name mapping. """
|
||||
|
||||
def __init__(_) -> None:
|
||||
"""
|
||||
Default constructor, ensure FFMPEG binaries are available.
|
||||
|
||||
Raises:
|
||||
SpleeterError:
|
||||
If ffmpeg or ffprobe is not found.
|
||||
"""
|
||||
for binary in ('ffmpeg', 'ffprobe'):
|
||||
if shutil.which(binary) is None:
|
||||
raise SpleeterError('{} binary not found'.format(binary))
|
||||
|
||||
def load(
|
||||
self, path, offset=None, duration=None,
|
||||
sample_rate=None, dtype=np.float32):
|
||||
""" Loads the audio file denoted by the given path
|
||||
and returns it data as a waveform.
|
||||
|
||||
:param path: Path of the audio file to load data from.
|
||||
:param offset: (Optional) Start offset to load from in seconds.
|
||||
:param duration: (Optional) Duration to load in seconds.
|
||||
:param sample_rate: (Optional) Sample rate to load audio with.
|
||||
:param dtype: (Optional) Numpy data type to use, default to float32.
|
||||
:returns: Loaded data a (waveform, sample_rate) tuple.
|
||||
:raise SpleeterError: If any error occurs while loading audio.
|
||||
_,
|
||||
path: Union[Path, str],
|
||||
offset: Optional[float] = None,
|
||||
duration: Optional[float] = None,
|
||||
sample_rate: Optional[float] = None,
|
||||
dtype: np.dtype = np.float32) -> Signal:
|
||||
"""
|
||||
_check_ffmpeg_install()
|
||||
Loads the audio file denoted by the given path
|
||||
and returns it data as a waveform.
|
||||
|
||||
Parameters:
|
||||
path (Union[Path, str]:
|
||||
Path of the audio file to load data from.
|
||||
offset (Optional[float]):
|
||||
Start offset to load from in seconds.
|
||||
duration (Optional[float]):
|
||||
Duration to load in seconds.
|
||||
sample_rate (Optional[float]):
|
||||
Sample rate to load audio with.
|
||||
dtype (numpy.dtype):
|
||||
(Optional) Numpy data type to use, default to `float32`.
|
||||
|
||||
Returns:
|
||||
Signal:
|
||||
Loaded data a (waveform, sample_rate) tuple.
|
||||
|
||||
Raises:
|
||||
SpleeterError:
|
||||
If any error occurs while loading audio.
|
||||
"""
|
||||
if isinstance(path, Path):
|
||||
path = str(path)
|
||||
if not isinstance(path, str):
|
||||
path = path.decode()
|
||||
try:
|
||||
@@ -97,9 +113,9 @@ class FFMPEGProcessAudioAdapter(AudioAdapter):
|
||||
sample_rate = metadata['sample_rate']
|
||||
output_kwargs = {'format': 'f32le', 'ar': sample_rate}
|
||||
if duration is not None:
|
||||
output_kwargs['t'] = _to_ffmpeg_time(duration)
|
||||
output_kwargs['t'] = str(dt.timedelta(seconds=duration))
|
||||
if offset is not None:
|
||||
output_kwargs['ss'] = _to_ffmpeg_time(offset)
|
||||
output_kwargs['ss'] = str(dt.timedelta(seconds=offset))
|
||||
process = (
|
||||
ffmpeg
|
||||
.input(path)
|
||||
@@ -112,29 +128,46 @@ class FFMPEGProcessAudioAdapter(AudioAdapter):
|
||||
return (waveform, sample_rate)
|
||||
|
||||
def save(
|
||||
self, path, data, sample_rate,
|
||||
codec=None, bitrate=None):
|
||||
""" Write waveform data to the file denoted by the given path
|
||||
using FFMPEG process.
|
||||
|
||||
:param path: Path of the audio file to save data in.
|
||||
:param data: Waveform data to write.
|
||||
:param sample_rate: Sample rate to write file in.
|
||||
:param codec: (Optional) Writing codec to use.
|
||||
:param bitrate: (Optional) Bitrate of the written audio file.
|
||||
:raise IOError: If any error occurs while using FFMPEG to write data.
|
||||
self,
|
||||
path: Union[Path, str],
|
||||
data: np.ndarray,
|
||||
sample_rate: float,
|
||||
codec: Codec = None,
|
||||
bitrate: str = None) -> None:
|
||||
"""
|
||||
_check_ffmpeg_install()
|
||||
Write waveform data to the file denoted by the given path using
|
||||
FFMPEG process.
|
||||
|
||||
Parameters:
|
||||
path (Union[Path, str]):
|
||||
Path like of the audio file to save data in.
|
||||
data (numpy.ndarray):
|
||||
Waveform data to write.
|
||||
sample_rate (float):
|
||||
Sample rate to write file in.
|
||||
codec ():
|
||||
(Optional) Writing codec to use, default to `None`.
|
||||
bitrate (str):
|
||||
(Optional) Bitrate of the written audio file, default to
|
||||
`None`.
|
||||
|
||||
Raises:
|
||||
IOError:
|
||||
If any error occurs while using FFMPEG to write data.
|
||||
"""
|
||||
if isinstance(path, Path):
|
||||
path = str(path)
|
||||
directory = os.path.dirname(path)
|
||||
if not os.path.exists(directory):
|
||||
raise SpleeterError(f'output directory does not exists: {directory}')
|
||||
get_logger().debug('Writing file %s', path)
|
||||
raise SpleeterError(
|
||||
f'output directory does not exists: {directory}')
|
||||
get_logger().debug(f'Writing file {path}')
|
||||
input_kwargs = {'ar': sample_rate, 'ac': data.shape[1]}
|
||||
output_kwargs = {'ar': sample_rate, 'strict': '-2'}
|
||||
if bitrate:
|
||||
output_kwargs['audio_bitrate'] = bitrate
|
||||
if codec is not None and codec != 'wav':
|
||||
output_kwargs['codec'] = _to_ffmpeg_codec(codec)
|
||||
output_kwargs['codec'] = self.SUPPORTED_CODECS.get(codec, codec)
|
||||
process = (
|
||||
ffmpeg
|
||||
.input('pipe:', format='f32le', **input_kwargs)
|
||||
@@ -147,4 +180,4 @@ class FFMPEGProcessAudioAdapter(AudioAdapter):
|
||||
process.wait()
|
||||
except IOError:
|
||||
raise SpleeterError(f'FFMPEG error: {process.stderr.read()}')
|
||||
get_logger().info('File %s written succesfully', path)
|
||||
get_logger().info(f'File {path} written succesfully')
|
||||
|
||||
Reference in New Issue
Block a user