praatio.audio

Functions for reading, writing, querying, and manipulating audio.

see examples/anonymize_recording.py, examples/delete_vowels.py, and examples/extract_subwavs.py

  1"""
  2Functions for reading, writing, querying, and manipulating audio.
  3
  4see **examples/anonymize_recording.py**, **examples/delete_vowels.py**,
  5and **examples/extract_subwavs.py**
  6"""
  7
  8import math
  9import wave
 10import struct
 11import copy
 12from typing import List, Tuple, Optional, Callable
 13from abc import ABC, abstractmethod
 14from functools import partial
 15
 16from typing_extensions import Final
 17
 18from praatio.utilities import errors
 19from praatio.utilities import utils
 20
 21sampleWidthDict: Final = {1: "b", 2: "h", 4: "i", 8: "q"}
 22
 23_KEEP: Final = "keep"
 24_DELETE: Final = "delete"
 25
 26ZERO_CROSSING_TIMESTEP: Final = 0.002
 27DEFAULT_SINE_FREQUENCY = 200
 28NUM_BITS_IN_A_BYTE = 8
 29
 30
 31def calculateMaxAmplitude(sampleWidth: int) -> int:
 32    """Gets the largest possible amplitude representable by a given sample width
 33
 34    The formula is 2^(n-1) - 1 where n is the number of bits
 35    - the first -1 is because the result is signed
 36    - the second -1 is because the value is 0 based
 37    e.g. if n=3 then 2^(3-1)-1 => 3
 38         if n=4 then 2^(4-1)-1 => 7
 39
 40    Args:
 41        sampleWidth: the width in bytes of a sample in the wave file
 42
 43    Returns:
 44        An integer
 45    """
 46    return 2 ** (sampleWidth * NUM_BITS_IN_A_BYTE - 1) - 1
 47
 48
 49def convertFromBytes(byteStr: bytes, sampleWidth: int) -> Tuple[int, ...]:
 50    """Convert frames of a python wave object from bytes to numbers"""
 51    byteCode = sampleWidthDict[sampleWidth]
 52    actualNumFrames = int(len(byteStr) / float(sampleWidth))
 53    audioFrameList = struct.unpack("<" + byteCode * actualNumFrames, byteStr)
 54
 55    return audioFrameList
 56
 57
 58def convertToBytes(numList: Tuple[int, ...], sampleWidth: int) -> bytes:
 59    """Convert frames of a python wave object from numbers to bytes"""
 60    byteCode = sampleWidthDict[sampleWidth]
 61    byteStr = struct.pack("<" + byteCode * len(numList), *numList)
 62
 63    return byteStr
 64
 65
 66def extractSubwav(fn: str, outputFN: str, startTime: float, endTime: float) -> None:
 67    """Get a subsegment of an audio file"""
 68    wav = QueryWav(fn)
 69    frames = wav.getFrames(startTime, endTime)
 70    wav.outputFrames(frames, outputFN)
 71
 72
 73def getDuration(fn: str) -> float:
 74    """Get the total duration of an audio file"""
 75    return QueryWav(fn).duration
 76
 77
 78def readFramesAtTime(
 79    audiofile: wave.Wave_read, startTime: float, endTime: float
 80) -> bytes:
 81    """Read the audio frames for the specified internal of an audio file"""
 82    params = audiofile.getparams()
 83    frameRate = params[2]
 84
 85    audiofile.setpos(round(frameRate * startTime))
 86    frames = audiofile.readframes(round(frameRate * (endTime - startTime)))
 87
 88    return frames
 89
 90
 91def readFramesAtTimes(
 92    audiofile: wave.Wave_read,
 93    keepIntervals: List[Tuple[float, float]] = None,
 94    deleteIntervals: List[Tuple[float, float]] = None,
 95    replaceFunc: Optional[Callable[[float], bytes]] = None,
 96) -> bytes:
 97    """Reads an audio file into memory, with some configuration
 98
 99    Args:
100        audiofile: the time to get the interval from
101        keepIntervals: duration of the interval
102        deleteIntervals: the maximum allowed time
103        replaceFunc: is the interval before or after the targetTime?
104
105    Returns:
106        A bytestring of the loaded audio file
107
108    Raises:
109        ArgumentError: The timestamps in keepIntervals or deleteIntervals exceed the audio duration
110        ArgumentError: Only one of keepIntervals and deleteIntervals can be specified
111    """
112    params = audiofile.getparams()
113    frameRate = params[2]
114    nframes = params[3]
115
116    duration = nframes / float(frameRate)
117    markedIntervals = _computeKeepDeleteIntervals(
118        0.0, duration, keepIntervals, deleteIntervals
119    )
120
121    if markedIntervals[-1][1] > duration:
122        raise errors.ArgumentError(
123            "Timestamps in keepIntervals and deleteIntervals cannot exceed wav file duration"
124        )
125
126    # Grab the sections to be kept
127    audioFrames: bytes = b""
128    for start, end, label in markedIntervals:
129        if label == _KEEP:
130            audioFrames += readFramesAtTime(audiofile, start, end)
131
132        # If we are not keeping a region and we're not shrinking the
133        # duration, fill in the deleted portions with zeros
134        elif label == _DELETE and replaceFunc:
135            audioFrames += replaceFunc(end - start)
136
137    return audioFrames
138
139
140class AbstractWav(ABC):
141    def __init__(self, params: List):
142        self.params = params
143
144        self.nchannels: int = params[0]
145        self.sampleWidth: int = params[1]
146        self.frameRate: int = params[2]
147        self.nframes: int = params[3]
148        self.comptype = params[4]
149        self.compname = params[5]
150
151        if self.nchannels != 1:
152            raise (
153                errors.ArgumentError(
154                    "Only audio with a single channel can be loaded. "
155                    "Your file was #{self.nchannels}."
156                )
157            )
158
159    def _iterZeroCrossings(
160        self,
161        start: float,
162        withinThreshold,
163        step: float,
164        reverse: bool,
165    ) -> Optional[float]:
166        if not withinThreshold(start):
167            return None
168
169        startTime, endTime = utils.getInterval(start, step, self.duration, reverse)
170        samples = self.getSamples(startTime, endTime)
171
172        return _findNextZeroCrossing(startTime, samples, self.frameRate, reverse)
173
174    @property
175    @abstractmethod
176    def duration(self) -> float:  # pragma: no cover
177        pass
178
179    def findNearestZeroCrossing(
180        self, targetTime: float, timeStep: float = ZERO_CROSSING_TIMESTEP
181    ) -> float:
182        """Finds the nearest zero crossing at the given time in an audio file
183
184        Looks both before and after the timeStamp
185        """
186
187        leftStartTime = rightStartTime = targetTime
188
189        samplesPerStep = timeStep * self.frameRate
190        if samplesPerStep < 2:
191            raise errors.ArgumentError(
192                f"'timeStep' ({timeStep}) must be large enough to contain "
193                f"multiple samples for audio framerate ({self.frameRate})"
194            )
195
196        # Find zero crossings
197        smallestLeft = None
198        smallestRight = None
199        oneSampleDuration = 1 / self.frameRate
200        while True:
201            # Increasing our timeStep by one sample enables
202            # us to find zero-crossings that sit at the boundary
203            # of two samples (two different iterations of this loop)
204            smallestLeft = self._iterZeroCrossings(
205                leftStartTime, lambda x: x > 0, timeStep + oneSampleDuration, True
206            )
207            smallestRight = self._iterZeroCrossings(
208                rightStartTime,
209                lambda x: x + timeStep < self.duration,
210                timeStep + oneSampleDuration,
211                False,
212            )
213
214            if smallestLeft is not None or smallestRight is not None:
215                break
216            # TODO: I think this case shouldn't be possible
217            elif leftStartTime < 0 and rightStartTime > self.duration:
218                raise (errors.FindZeroCrossingError(0, self.duration))
219            else:
220                # oneSampleDuration is not added here
221                leftStartTime -= timeStep
222                rightStartTime += timeStep
223
224        # Under ordinary circumstances, this should not occur
225        if smallestLeft is None and smallestRight is None:
226            raise errors.FindZeroCrossingError(0, self.duration)
227
228        return utils.chooseClosestTime(targetTime, smallestLeft, smallestRight)
229
230    @abstractmethod
231    def getFrames(self, startTime: float, endTime: float) -> bytes:  # pragma: no cover
232        pass
233
234    @abstractmethod
235    def getSamples(
236        self, startTime: float, endTime: float
237    ) -> Tuple[int, ...]:  # pragma: no cover
238        pass
239
240    def outputFrames(self, frames: bytes, outputFN: str) -> None:
241        """Output frames using the same parameters as this Wav"""
242        outWave = wave.open(outputFN, "w")
243        outWave.setparams(
244            [
245                self.nchannels,
246                self.sampleWidth,
247                self.frameRate,
248                len(frames),
249                self.comptype,
250                self.compname,
251            ]
252        )
253        outWave.writeframes(frames)
254
255
256class QueryWav(AbstractWav):
257    """A class for getting information about a wave file
258
259    The wave file is never loaded--we only keep a reference to the
260    file descriptor.  All operations on QueryWavs are fast.
261    QueryWavs don't (shouldn't) change state.  For doing
262    multiple modifications, use a Wav.
263    """
264
265    def __init__(self, fn: str):
266        self.audiofile = wave.open(fn, "r")
267        super(QueryWav, self).__init__(self.audiofile.getparams())
268
269    @property
270    def duration(self) -> float:
271        duration = float(self.nframes) / self.frameRate
272        return duration
273
274    def getFrames(self, startTime: float = None, endTime: float = None) -> bytes:
275        if startTime is None:
276            startTime = 0
277
278        if endTime is None:
279            endTime = self.duration
280
281        return readFramesAtTime(self.audiofile, startTime, endTime)
282
283    def getSamples(self, startTime: float, endTime: float) -> Tuple[int, ...]:
284        frames = self.getFrames(startTime, endTime)
285        audioFrameList = convertFromBytes(frames, self.sampleWidth)
286
287        return audioFrameList
288
289
290class Wav(AbstractWav):
291    """A class for manipulating audio files
292
293    The wav file is represented by its wavform as a series of signed
294    integers.  This can be very slow and take up lots of memory with
295    large files.
296    """
297
298    def __init__(self, frames: bytes, params: List):
299        self.frames = frames
300        super(Wav, self).__init__(params)
301
302    def __eq__(self, other):
303        if not isinstance(other, Wav):
304            return False
305
306        return self.frames == other.frames
307
308    def _getIndexAtTime(self, startTime: float) -> int:
309        """Gets the index in the frame list for the given time"""
310        return round(startTime * self.frameRate * self.sampleWidth)
311
312    @classmethod
313    def open(cls, fn: str) -> "Wav":
314        wav = wave.open(fn, "r")
315        audioFrames = readFramesAtTime(wav, startTime=0, endTime=getDuration(fn))
316        return Wav(audioFrames, wav.getparams())
317
318    def concatenate(self, frames: bytes) -> None:
319        self.frames += frames
320
321    def deleteSegment(self, startTime: float, endTime: float) -> None:
322        i = self._getIndexAtTime(startTime)
323        j = self._getIndexAtTime(endTime)
324        self.frames = self.frames[:i] + self.frames[j:]
325
326    @property
327    def duration(self) -> float:
328        return len(self.frames) / self.frameRate / self.sampleWidth
329
330    def getFrames(self, startTime: float, endTime: float) -> bytes:
331        i = self._getIndexAtTime(startTime)
332        j = self._getIndexAtTime(endTime)
333        return self.frames[i:j]
334
335    def getSamples(self, startTime: float, endTime: float) -> Tuple[int, ...]:
336        frames = self.getFrames(startTime, endTime)
337        return convertFromBytes(frames, self.sampleWidth)
338
339    def getSubwav(self, startTime: float, endTime: float) -> "Wav":
340        frames = self.getFrames(startTime, endTime)
341        return Wav(frames, self.params)
342
343    def insert(self, startTime: float, frames: bytes) -> None:
344        i = self._getIndexAtTime(startTime)
345        self.frames = self.frames[:i] + frames + self.frames[i:]
346
347    def new(self) -> "Wav":
348        return copy.deepcopy(self)
349
350    def replaceSegment(self, startTime: float, endTime: float, frames: bytes):
351        self.deleteSegment(startTime, endTime)
352        self.insert(startTime, frames)
353
354    def save(self, outputFN: str) -> None:
355        outWave = wave.open(outputFN, "w")
356        outWave.setparams(
357            [
358                self.nchannels,
359                self.sampleWidth,
360                self.frameRate,
361                len(self.frames),
362                self.comptype,
363                self.compname,
364            ]
365        )
366        outWave.writeframes(self.frames)
367
368
369class AudioGenerator:
370    def __init__(self, sampleWidth, frameRate):
371        self.sampleWidth: int = sampleWidth
372        self.frameRate: int = frameRate
373
374    @classmethod
375    def fromWav(cls, wav: AbstractWav) -> "AudioGenerator":
376        """Build an AudioGenerator with parameters derived from a Wav or QueryWav"""
377        return AudioGenerator(wav.sampleWidth, wav.frameRate)
378
379    def buildSineWaveGenerator(self, frequency, amplitude) -> Callable[[float], bytes]:
380        """Returns a function that takes a duration and returns a generated sine wave"""
381        return partial(
382            self.generateSineWave,
383            frequency=frequency,
384            amplitude=amplitude,
385        )
386
387    def generateSineWave(
388        self,
389        duration: float,
390        frequency: int,
391        amplitude: Optional[float] = None,
392    ) -> bytes:
393        if amplitude is None:
394            amplitude = calculateMaxAmplitude(self.sampleWidth)
395
396        nSamples = round(duration * self.frameRate)
397        wavSpec = 2 * math.pi * frequency / float(self.frameRate)
398        sinWaveNums = [
399            round(amplitude * math.sin(wavSpec * i)) for i in range(nSamples)
400        ]
401        return convertToBytes(tuple(sinWaveNums), self.sampleWidth)
402
403    def generateSilence(self, duration: float) -> bytes:
404        zeroBinValue = struct.pack(sampleWidthDict[self.sampleWidth], 0)
405        return zeroBinValue * round(self.frameRate * duration)
406
407
408def _findNextZeroCrossing(
409    startTime: float,
410    samples: Tuple[int, ...],
411    frameRate: float,
412    reverse: bool,
413) -> Optional[float]:
414    """Finds the nearest zero crossing, searching in one direction
415
416    Can do a 'reverse' search by setting reverse to True.  In that case,
417    the sample list is searched from back to front.
418
419    targetTime is the startTime if reverse=False and
420        the endTime if reverse=True
421    """
422    zeroI = _getNearestZero(samples, reverse)
423    if zeroI is None:
424        zeroI = _getZeroThresholdCrossing(samples, reverse)
425        if zeroI is None:
426            return None
427
428    return startTime + zeroI / float(frameRate)
429
430
431def _getNearestZero(samples: Tuple[int, ...], reverse: bool) -> Optional[int]:
432    return utils.find(samples, 0, reverse)
433
434
435def _getZeroThresholdCrossing(samples: Tuple[int, ...], reverse: bool) -> Optional[int]:
436    signList = [utils.sign(val) for val in samples]
437    changeList = [signList[i] != signList[i + 1] for i in range(len(samples) - 1)]
438    zeroI = utils.find(changeList, True, reverse)
439
440    if zeroI is None:
441        return None
442
443    # We found the zero by comparing points to the point adjacent to them.
444    # It is possible the adjacent point is closer to zero than this one,
445    # in which case, it is the better zeroedI
446    if abs(samples[zeroI]) > abs(samples[zeroI + 1]):
447        zeroI = zeroI + 1
448
449    return zeroI
450
451
452def _computeKeepDeleteIntervals(
453    start: float,
454    stop: float,
455    keepIntervals: List[Tuple[float, float]] = None,
456    deleteIntervals: List[Tuple[float, float]] = None,
457) -> List[Tuple[float, float, str]]:
458    """Returns a list of intervals, each one labeled 'keep' or 'delete'"""
459    if keepIntervals and deleteIntervals:
460        raise errors.ArgumentError(
461            "You cannot specify both 'keepIntervals' or 'deleteIntervals'."
462        )
463
464    elif not keepIntervals and not deleteIntervals:
465        computedKeepIntervals = [(start, stop)]
466        computedDeleteIntervals = []
467
468    elif deleteIntervals:
469        deleteTimestamps = [(interval[0], interval[1]) for interval in deleteIntervals]
470        computedKeepIntervals = utils.invertIntervalList(deleteTimestamps, start, stop)
471        computedDeleteIntervals = deleteTimestamps
472
473    elif keepIntervals:
474        keepTimestamps = [(interval[0], interval[1]) for interval in keepIntervals]
475        computedKeepIntervals = keepTimestamps
476        computedDeleteIntervals = utils.invertIntervalList(keepTimestamps, start, stop)
477
478    annotatedKeepIntervals = [
479        (start, end, _KEEP) for start, end in computedKeepIntervals
480    ]
481    annotatedDeleteIntervals = [
482        (start, end, _DELETE) for start, end in computedDeleteIntervals
483    ]
484    intervals = sorted(annotatedKeepIntervals + annotatedDeleteIntervals)
485
486    return intervals
sampleWidthDict: Final = {1: 'b', 2: 'h', 4: 'i', 8: 'q'}
ZERO_CROSSING_TIMESTEP: Final = 0.002
DEFAULT_SINE_FREQUENCY = 200
NUM_BITS_IN_A_BYTE = 8
def calculateMaxAmplitude(sampleWidth: int) -> int:
32def calculateMaxAmplitude(sampleWidth: int) -> int:
33    """Gets the largest possible amplitude representable by a given sample width
34
35    The formula is 2^(n-1) - 1 where n is the number of bits
36    - the first -1 is because the result is signed
37    - the second -1 is because the value is 0 based
38    e.g. if n=3 then 2^(3-1)-1 => 3
39         if n=4 then 2^(4-1)-1 => 7
40
41    Args:
42        sampleWidth: the width in bytes of a sample in the wave file
43
44    Returns:
45        An integer
46    """
47    return 2 ** (sampleWidth * NUM_BITS_IN_A_BYTE - 1) - 1

Gets the largest possible amplitude representable by a given sample width

The formula is 2^(n-1) - 1 where n is the number of bits

  • the first -1 is because the result is signed
  • the second -1 is because the value is 0 based e.g. if n=3 then 2^(3-1)-1 => 3 if n=4 then 2^(4-1)-1 => 7
Arguments:
  • sampleWidth: the width in bytes of a sample in the wave file
Returns:

An integer

def convertFromBytes(byteStr: bytes, sampleWidth: int) -> Tuple[int, ...]:
50def convertFromBytes(byteStr: bytes, sampleWidth: int) -> Tuple[int, ...]:
51    """Convert frames of a python wave object from bytes to numbers"""
52    byteCode = sampleWidthDict[sampleWidth]
53    actualNumFrames = int(len(byteStr) / float(sampleWidth))
54    audioFrameList = struct.unpack("<" + byteCode * actualNumFrames, byteStr)
55
56    return audioFrameList

Convert frames of a python wave object from bytes to numbers

def convertToBytes(numList: Tuple[int, ...], sampleWidth: int) -> bytes:
59def convertToBytes(numList: Tuple[int, ...], sampleWidth: int) -> bytes:
60    """Convert frames of a python wave object from numbers to bytes"""
61    byteCode = sampleWidthDict[sampleWidth]
62    byteStr = struct.pack("<" + byteCode * len(numList), *numList)
63
64    return byteStr

Convert frames of a python wave object from numbers to bytes

def extractSubwav(fn: str, outputFN: str, startTime: float, endTime: float) -> None:
67def extractSubwav(fn: str, outputFN: str, startTime: float, endTime: float) -> None:
68    """Get a subsegment of an audio file"""
69    wav = QueryWav(fn)
70    frames = wav.getFrames(startTime, endTime)
71    wav.outputFrames(frames, outputFN)

Get a subsegment of an audio file

def getDuration(fn: str) -> float:
74def getDuration(fn: str) -> float:
75    """Get the total duration of an audio file"""
76    return QueryWav(fn).duration

Get the total duration of an audio file

def readFramesAtTime(audiofile: wave.Wave_read, startTime: float, endTime: float) -> bytes:
79def readFramesAtTime(
80    audiofile: wave.Wave_read, startTime: float, endTime: float
81) -> bytes:
82    """Read the audio frames for the specified internal of an audio file"""
83    params = audiofile.getparams()
84    frameRate = params[2]
85
86    audiofile.setpos(round(frameRate * startTime))
87    frames = audiofile.readframes(round(frameRate * (endTime - startTime)))
88
89    return frames

Read the audio frames for the specified internal of an audio file

def readFramesAtTimes( audiofile: wave.Wave_read, keepIntervals: List[Tuple[float, float]] = None, deleteIntervals: List[Tuple[float, float]] = None, replaceFunc: Optional[Callable[[float], bytes]] = None) -> bytes:
 92def readFramesAtTimes(
 93    audiofile: wave.Wave_read,
 94    keepIntervals: List[Tuple[float, float]] = None,
 95    deleteIntervals: List[Tuple[float, float]] = None,
 96    replaceFunc: Optional[Callable[[float], bytes]] = None,
 97) -> bytes:
 98    """Reads an audio file into memory, with some configuration
 99
100    Args:
101        audiofile: the time to get the interval from
102        keepIntervals: duration of the interval
103        deleteIntervals: the maximum allowed time
104        replaceFunc: is the interval before or after the targetTime?
105
106    Returns:
107        A bytestring of the loaded audio file
108
109    Raises:
110        ArgumentError: The timestamps in keepIntervals or deleteIntervals exceed the audio duration
111        ArgumentError: Only one of keepIntervals and deleteIntervals can be specified
112    """
113    params = audiofile.getparams()
114    frameRate = params[2]
115    nframes = params[3]
116
117    duration = nframes / float(frameRate)
118    markedIntervals = _computeKeepDeleteIntervals(
119        0.0, duration, keepIntervals, deleteIntervals
120    )
121
122    if markedIntervals[-1][1] > duration:
123        raise errors.ArgumentError(
124            "Timestamps in keepIntervals and deleteIntervals cannot exceed wav file duration"
125        )
126
127    # Grab the sections to be kept
128    audioFrames: bytes = b""
129    for start, end, label in markedIntervals:
130        if label == _KEEP:
131            audioFrames += readFramesAtTime(audiofile, start, end)
132
133        # If we are not keeping a region and we're not shrinking the
134        # duration, fill in the deleted portions with zeros
135        elif label == _DELETE and replaceFunc:
136            audioFrames += replaceFunc(end - start)
137
138    return audioFrames

Reads an audio file into memory, with some configuration

Arguments:
  • audiofile: the time to get the interval from
  • keepIntervals: duration of the interval
  • deleteIntervals: the maximum allowed time
  • replaceFunc: is the interval before or after the targetTime?
Returns:

A bytestring of the loaded audio file

Raises:
  • ArgumentError: The timestamps in keepIntervals or deleteIntervals exceed the audio duration
  • ArgumentError: Only one of keepIntervals and deleteIntervals can be specified
class AbstractWav(abc.ABC):
141class AbstractWav(ABC):
142    def __init__(self, params: List):
143        self.params = params
144
145        self.nchannels: int = params[0]
146        self.sampleWidth: int = params[1]
147        self.frameRate: int = params[2]
148        self.nframes: int = params[3]
149        self.comptype = params[4]
150        self.compname = params[5]
151
152        if self.nchannels != 1:
153            raise (
154                errors.ArgumentError(
155                    "Only audio with a single channel can be loaded. "
156                    "Your file was #{self.nchannels}."
157                )
158            )
159
160    def _iterZeroCrossings(
161        self,
162        start: float,
163        withinThreshold,
164        step: float,
165        reverse: bool,
166    ) -> Optional[float]:
167        if not withinThreshold(start):
168            return None
169
170        startTime, endTime = utils.getInterval(start, step, self.duration, reverse)
171        samples = self.getSamples(startTime, endTime)
172
173        return _findNextZeroCrossing(startTime, samples, self.frameRate, reverse)
174
175    @property
176    @abstractmethod
177    def duration(self) -> float:  # pragma: no cover
178        pass
179
180    def findNearestZeroCrossing(
181        self, targetTime: float, timeStep: float = ZERO_CROSSING_TIMESTEP
182    ) -> float:
183        """Finds the nearest zero crossing at the given time in an audio file
184
185        Looks both before and after the timeStamp
186        """
187
188        leftStartTime = rightStartTime = targetTime
189
190        samplesPerStep = timeStep * self.frameRate
191        if samplesPerStep < 2:
192            raise errors.ArgumentError(
193                f"'timeStep' ({timeStep}) must be large enough to contain "
194                f"multiple samples for audio framerate ({self.frameRate})"
195            )
196
197        # Find zero crossings
198        smallestLeft = None
199        smallestRight = None
200        oneSampleDuration = 1 / self.frameRate
201        while True:
202            # Increasing our timeStep by one sample enables
203            # us to find zero-crossings that sit at the boundary
204            # of two samples (two different iterations of this loop)
205            smallestLeft = self._iterZeroCrossings(
206                leftStartTime, lambda x: x > 0, timeStep + oneSampleDuration, True
207            )
208            smallestRight = self._iterZeroCrossings(
209                rightStartTime,
210                lambda x: x + timeStep < self.duration,
211                timeStep + oneSampleDuration,
212                False,
213            )
214
215            if smallestLeft is not None or smallestRight is not None:
216                break
217            # TODO: I think this case shouldn't be possible
218            elif leftStartTime < 0 and rightStartTime > self.duration:
219                raise (errors.FindZeroCrossingError(0, self.duration))
220            else:
221                # oneSampleDuration is not added here
222                leftStartTime -= timeStep
223                rightStartTime += timeStep
224
225        # Under ordinary circumstances, this should not occur
226        if smallestLeft is None and smallestRight is None:
227            raise errors.FindZeroCrossingError(0, self.duration)
228
229        return utils.chooseClosestTime(targetTime, smallestLeft, smallestRight)
230
231    @abstractmethod
232    def getFrames(self, startTime: float, endTime: float) -> bytes:  # pragma: no cover
233        pass
234
235    @abstractmethod
236    def getSamples(
237        self, startTime: float, endTime: float
238    ) -> Tuple[int, ...]:  # pragma: no cover
239        pass
240
241    def outputFrames(self, frames: bytes, outputFN: str) -> None:
242        """Output frames using the same parameters as this Wav"""
243        outWave = wave.open(outputFN, "w")
244        outWave.setparams(
245            [
246                self.nchannels,
247                self.sampleWidth,
248                self.frameRate,
249                len(frames),
250                self.comptype,
251                self.compname,
252            ]
253        )
254        outWave.writeframes(frames)

Helper class that provides a standard way to create an ABC using inheritance.

params
nchannels: int
sampleWidth: int
frameRate: int
nframes: int
comptype
compname
duration: float
def findNearestZeroCrossing(self, targetTime: float, timeStep: float = 0.002) -> float:
180    def findNearestZeroCrossing(
181        self, targetTime: float, timeStep: float = ZERO_CROSSING_TIMESTEP
182    ) -> float:
183        """Finds the nearest zero crossing at the given time in an audio file
184
185        Looks both before and after the timeStamp
186        """
187
188        leftStartTime = rightStartTime = targetTime
189
190        samplesPerStep = timeStep * self.frameRate
191        if samplesPerStep < 2:
192            raise errors.ArgumentError(
193                f"'timeStep' ({timeStep}) must be large enough to contain "
194                f"multiple samples for audio framerate ({self.frameRate})"
195            )
196
197        # Find zero crossings
198        smallestLeft = None
199        smallestRight = None
200        oneSampleDuration = 1 / self.frameRate
201        while True:
202            # Increasing our timeStep by one sample enables
203            # us to find zero-crossings that sit at the boundary
204            # of two samples (two different iterations of this loop)
205            smallestLeft = self._iterZeroCrossings(
206                leftStartTime, lambda x: x > 0, timeStep + oneSampleDuration, True
207            )
208            smallestRight = self._iterZeroCrossings(
209                rightStartTime,
210                lambda x: x + timeStep < self.duration,
211                timeStep + oneSampleDuration,
212                False,
213            )
214
215            if smallestLeft is not None or smallestRight is not None:
216                break
217            # TODO: I think this case shouldn't be possible
218            elif leftStartTime < 0 and rightStartTime > self.duration:
219                raise (errors.FindZeroCrossingError(0, self.duration))
220            else:
221                # oneSampleDuration is not added here
222                leftStartTime -= timeStep
223                rightStartTime += timeStep
224
225        # Under ordinary circumstances, this should not occur
226        if smallestLeft is None and smallestRight is None:
227            raise errors.FindZeroCrossingError(0, self.duration)
228
229        return utils.chooseClosestTime(targetTime, smallestLeft, smallestRight)

Finds the nearest zero crossing at the given time in an audio file

Looks both before and after the timeStamp

@abstractmethod
def getFrames(self, startTime: float, endTime: float) -> bytes:
231    @abstractmethod
232    def getFrames(self, startTime: float, endTime: float) -> bytes:  # pragma: no cover
233        pass
@abstractmethod
def getSamples(self, startTime: float, endTime: float) -> Tuple[int, ...]:
235    @abstractmethod
236    def getSamples(
237        self, startTime: float, endTime: float
238    ) -> Tuple[int, ...]:  # pragma: no cover
239        pass
def outputFrames(self, frames: bytes, outputFN: str) -> None:
241    def outputFrames(self, frames: bytes, outputFN: str) -> None:
242        """Output frames using the same parameters as this Wav"""
243        outWave = wave.open(outputFN, "w")
244        outWave.setparams(
245            [
246                self.nchannels,
247                self.sampleWidth,
248                self.frameRate,
249                len(frames),
250                self.comptype,
251                self.compname,
252            ]
253        )
254        outWave.writeframes(frames)

Output frames using the same parameters as this Wav

class QueryWav(AbstractWav):
257class QueryWav(AbstractWav):
258    """A class for getting information about a wave file
259
260    The wave file is never loaded--we only keep a reference to the
261    file descriptor.  All operations on QueryWavs are fast.
262    QueryWavs don't (shouldn't) change state.  For doing
263    multiple modifications, use a Wav.
264    """
265
266    def __init__(self, fn: str):
267        self.audiofile = wave.open(fn, "r")
268        super(QueryWav, self).__init__(self.audiofile.getparams())
269
270    @property
271    def duration(self) -> float:
272        duration = float(self.nframes) / self.frameRate
273        return duration
274
275    def getFrames(self, startTime: float = None, endTime: float = None) -> bytes:
276        if startTime is None:
277            startTime = 0
278
279        if endTime is None:
280            endTime = self.duration
281
282        return readFramesAtTime(self.audiofile, startTime, endTime)
283
284    def getSamples(self, startTime: float, endTime: float) -> Tuple[int, ...]:
285        frames = self.getFrames(startTime, endTime)
286        audioFrameList = convertFromBytes(frames, self.sampleWidth)
287
288        return audioFrameList

A class for getting information about a wave file

The wave file is never loaded--we only keep a reference to the file descriptor. All operations on QueryWavs are fast. QueryWavs don't (shouldn't) change state. For doing multiple modifications, use a Wav.

QueryWav(fn: str)
266    def __init__(self, fn: str):
267        self.audiofile = wave.open(fn, "r")
268        super(QueryWav, self).__init__(self.audiofile.getparams())
audiofile
duration: float
def getFrames(self, startTime: float = None, endTime: float = None) -> bytes:
275    def getFrames(self, startTime: float = None, endTime: float = None) -> bytes:
276        if startTime is None:
277            startTime = 0
278
279        if endTime is None:
280            endTime = self.duration
281
282        return readFramesAtTime(self.audiofile, startTime, endTime)
def getSamples(self, startTime: float, endTime: float) -> Tuple[int, ...]:
284    def getSamples(self, startTime: float, endTime: float) -> Tuple[int, ...]:
285        frames = self.getFrames(startTime, endTime)
286        audioFrameList = convertFromBytes(frames, self.sampleWidth)
287
288        return audioFrameList
class Wav(AbstractWav):
291class Wav(AbstractWav):
292    """A class for manipulating audio files
293
294    The wav file is represented by its wavform as a series of signed
295    integers.  This can be very slow and take up lots of memory with
296    large files.
297    """
298
299    def __init__(self, frames: bytes, params: List):
300        self.frames = frames
301        super(Wav, self).__init__(params)
302
303    def __eq__(self, other):
304        if not isinstance(other, Wav):
305            return False
306
307        return self.frames == other.frames
308
309    def _getIndexAtTime(self, startTime: float) -> int:
310        """Gets the index in the frame list for the given time"""
311        return round(startTime * self.frameRate * self.sampleWidth)
312
313    @classmethod
314    def open(cls, fn: str) -> "Wav":
315        wav = wave.open(fn, "r")
316        audioFrames = readFramesAtTime(wav, startTime=0, endTime=getDuration(fn))
317        return Wav(audioFrames, wav.getparams())
318
319    def concatenate(self, frames: bytes) -> None:
320        self.frames += frames
321
322    def deleteSegment(self, startTime: float, endTime: float) -> None:
323        i = self._getIndexAtTime(startTime)
324        j = self._getIndexAtTime(endTime)
325        self.frames = self.frames[:i] + self.frames[j:]
326
327    @property
328    def duration(self) -> float:
329        return len(self.frames) / self.frameRate / self.sampleWidth
330
331    def getFrames(self, startTime: float, endTime: float) -> bytes:
332        i = self._getIndexAtTime(startTime)
333        j = self._getIndexAtTime(endTime)
334        return self.frames[i:j]
335
336    def getSamples(self, startTime: float, endTime: float) -> Tuple[int, ...]:
337        frames = self.getFrames(startTime, endTime)
338        return convertFromBytes(frames, self.sampleWidth)
339
340    def getSubwav(self, startTime: float, endTime: float) -> "Wav":
341        frames = self.getFrames(startTime, endTime)
342        return Wav(frames, self.params)
343
344    def insert(self, startTime: float, frames: bytes) -> None:
345        i = self._getIndexAtTime(startTime)
346        self.frames = self.frames[:i] + frames + self.frames[i:]
347
348    def new(self) -> "Wav":
349        return copy.deepcopy(self)
350
351    def replaceSegment(self, startTime: float, endTime: float, frames: bytes):
352        self.deleteSegment(startTime, endTime)
353        self.insert(startTime, frames)
354
355    def save(self, outputFN: str) -> None:
356        outWave = wave.open(outputFN, "w")
357        outWave.setparams(
358            [
359                self.nchannels,
360                self.sampleWidth,
361                self.frameRate,
362                len(self.frames),
363                self.comptype,
364                self.compname,
365            ]
366        )
367        outWave.writeframes(self.frames)

A class for manipulating audio files

The wav file is represented by its wavform as a series of signed integers. This can be very slow and take up lots of memory with large files.

Wav(frames: bytes, params: List)
299    def __init__(self, frames: bytes, params: List):
300        self.frames = frames
301        super(Wav, self).__init__(params)
frames
@classmethod
def open(cls, fn: str) -> Wav:
313    @classmethod
314    def open(cls, fn: str) -> "Wav":
315        wav = wave.open(fn, "r")
316        audioFrames = readFramesAtTime(wav, startTime=0, endTime=getDuration(fn))
317        return Wav(audioFrames, wav.getparams())
def concatenate(self, frames: bytes) -> None:
319    def concatenate(self, frames: bytes) -> None:
320        self.frames += frames
def deleteSegment(self, startTime: float, endTime: float) -> None:
322    def deleteSegment(self, startTime: float, endTime: float) -> None:
323        i = self._getIndexAtTime(startTime)
324        j = self._getIndexAtTime(endTime)
325        self.frames = self.frames[:i] + self.frames[j:]
duration: float
def getFrames(self, startTime: float, endTime: float) -> bytes:
331    def getFrames(self, startTime: float, endTime: float) -> bytes:
332        i = self._getIndexAtTime(startTime)
333        j = self._getIndexAtTime(endTime)
334        return self.frames[i:j]
def getSamples(self, startTime: float, endTime: float) -> Tuple[int, ...]:
336    def getSamples(self, startTime: float, endTime: float) -> Tuple[int, ...]:
337        frames = self.getFrames(startTime, endTime)
338        return convertFromBytes(frames, self.sampleWidth)
def getSubwav(self, startTime: float, endTime: float) -> Wav:
340    def getSubwav(self, startTime: float, endTime: float) -> "Wav":
341        frames = self.getFrames(startTime, endTime)
342        return Wav(frames, self.params)
def insert(self, startTime: float, frames: bytes) -> None:
344    def insert(self, startTime: float, frames: bytes) -> None:
345        i = self._getIndexAtTime(startTime)
346        self.frames = self.frames[:i] + frames + self.frames[i:]
def new(self) -> Wav:
348    def new(self) -> "Wav":
349        return copy.deepcopy(self)
def replaceSegment(self, startTime: float, endTime: float, frames: bytes):
351    def replaceSegment(self, startTime: float, endTime: float, frames: bytes):
352        self.deleteSegment(startTime, endTime)
353        self.insert(startTime, frames)
def save(self, outputFN: str) -> None:
355    def save(self, outputFN: str) -> None:
356        outWave = wave.open(outputFN, "w")
357        outWave.setparams(
358            [
359                self.nchannels,
360                self.sampleWidth,
361                self.frameRate,
362                len(self.frames),
363                self.comptype,
364                self.compname,
365            ]
366        )
367        outWave.writeframes(self.frames)
class AudioGenerator:
370class AudioGenerator:
371    def __init__(self, sampleWidth, frameRate):
372        self.sampleWidth: int = sampleWidth
373        self.frameRate: int = frameRate
374
375    @classmethod
376    def fromWav(cls, wav: AbstractWav) -> "AudioGenerator":
377        """Build an AudioGenerator with parameters derived from a Wav or QueryWav"""
378        return AudioGenerator(wav.sampleWidth, wav.frameRate)
379
380    def buildSineWaveGenerator(self, frequency, amplitude) -> Callable[[float], bytes]:
381        """Returns a function that takes a duration and returns a generated sine wave"""
382        return partial(
383            self.generateSineWave,
384            frequency=frequency,
385            amplitude=amplitude,
386        )
387
388    def generateSineWave(
389        self,
390        duration: float,
391        frequency: int,
392        amplitude: Optional[float] = None,
393    ) -> bytes:
394        if amplitude is None:
395            amplitude = calculateMaxAmplitude(self.sampleWidth)
396
397        nSamples = round(duration * self.frameRate)
398        wavSpec = 2 * math.pi * frequency / float(self.frameRate)
399        sinWaveNums = [
400            round(amplitude * math.sin(wavSpec * i)) for i in range(nSamples)
401        ]
402        return convertToBytes(tuple(sinWaveNums), self.sampleWidth)
403
404    def generateSilence(self, duration: float) -> bytes:
405        zeroBinValue = struct.pack(sampleWidthDict[self.sampleWidth], 0)
406        return zeroBinValue * round(self.frameRate * duration)
AudioGenerator(sampleWidth, frameRate)
371    def __init__(self, sampleWidth, frameRate):
372        self.sampleWidth: int = sampleWidth
373        self.frameRate: int = frameRate
sampleWidth: int
frameRate: int
@classmethod
def fromWav(cls, wav: AbstractWav) -> AudioGenerator:
375    @classmethod
376    def fromWav(cls, wav: AbstractWav) -> "AudioGenerator":
377        """Build an AudioGenerator with parameters derived from a Wav or QueryWav"""
378        return AudioGenerator(wav.sampleWidth, wav.frameRate)

Build an AudioGenerator with parameters derived from a Wav or QueryWav

def buildSineWaveGenerator(self, frequency, amplitude) -> Callable[[float], bytes]:
380    def buildSineWaveGenerator(self, frequency, amplitude) -> Callable[[float], bytes]:
381        """Returns a function that takes a duration and returns a generated sine wave"""
382        return partial(
383            self.generateSineWave,
384            frequency=frequency,
385            amplitude=amplitude,
386        )

Returns a function that takes a duration and returns a generated sine wave

def generateSineWave( self, duration: float, frequency: int, amplitude: Optional[float] = None) -> bytes:
388    def generateSineWave(
389        self,
390        duration: float,
391        frequency: int,
392        amplitude: Optional[float] = None,
393    ) -> bytes:
394        if amplitude is None:
395            amplitude = calculateMaxAmplitude(self.sampleWidth)
396
397        nSamples = round(duration * self.frameRate)
398        wavSpec = 2 * math.pi * frequency / float(self.frameRate)
399        sinWaveNums = [
400            round(amplitude * math.sin(wavSpec * i)) for i in range(nSamples)
401        ]
402        return convertToBytes(tuple(sinWaveNums), self.sampleWidth)
def generateSilence(self, duration: float) -> bytes:
404    def generateSilence(self, duration: float) -> bytes:
405        zeroBinValue = struct.pack(sampleWidthDict[self.sampleWidth], 0)
406        return zeroBinValue * round(self.frameRate * duration)