praatio.pitch_and_intensity

Functions for working with pitch data

This file depends on the praat script get_pitch_and_intensity.praat (which depends on praat) to extract pitch and intensity values from audio data. Once the data is extracted, there are functions for data normalization and calculating various measures from the time stamped output of the praat script (ie generatePIMeasures())

For brevity, 'pitch_and_intensity' is referred to as 'PI'

see examples/get_pitch_and_formants.py

  1# coding: utf-8
  2"""Functions for working with pitch data
  3
  4This file depends on the praat script get_pitch_and_intensity.praat
  5(which depends on praat) to extract pitch and intensity values from
  6audio data.  Once the data is extracted, there are functions for
  7data normalization and calculating various measures from the time
  8stamped output of the praat script (ie **generatePIMeasures()**)
  9
 10For brevity, 'pitch_and_intensity' is referred to as 'PI'
 11
 12see **examples/get_pitch_and_formants.py**
 13"""
 14
 15import os
 16from os.path import join
 17import io
 18import math
 19from typing import List, Tuple, Optional, cast
 20
 21from praatio import data_points
 22from praatio import praatio_scripts
 23from praatio import textgrid
 24from praatio.utilities import errors
 25from praatio.utilities import my_math
 26from praatio.utilities import utils
 27from praatio.utilities.constants import Point
 28
 29
 30HERTZ = "Hertz"
 31UNSPECIFIED = "unspecified"
 32_PITCH_ERROR_TIER_NAME = "pitch errors"
 33
 34
 35def _extractPIPiecewise(
 36    inputFN: str,
 37    outputFN: str,
 38    praatEXE: str,
 39    minPitch: float,
 40    maxPitch: float,
 41    tgFN: str,
 42    tierName: str,
 43    tmpOutputPath: str,
 44    sampleStep: float = 0.01,
 45    silenceThreshold: float = 0.03,
 46    pitchUnit: str = HERTZ,
 47    forceRegenerate: bool = True,
 48    undefinedValue: float = None,
 49    medianFilterWindowSize: int = 0,
 50    pitchQuadInterp: bool = False,
 51) -> List[Tuple[float, ...]]:
 52    """Extracts pitch and int from each labeled interval in a textgrid
 53
 54    This has the benefit of being faster than using _extractPIFile if only
 55    labeled regions need to have their pitch values sampled, particularly
 56    for longer files.
 57
 58    Returns the result as a list.  Will load the serialized result
 59    if this has already been called on the appropriate files before
 60    """
 61    outputPath = os.path.split(outputFN)[0]
 62    utils.makeDir(outputPath)
 63
 64    windowSize = medianFilterWindowSize
 65
 66    if not os.path.exists(inputFN):
 67        raise errors.ArgumentError(f"Required folder does not exist: f{inputFN}")
 68
 69    firstTime = not os.path.exists(outputFN)
 70    if firstTime or forceRegenerate is True:
 71        utils.makeDir(tmpOutputPath)
 72        splitAudioList = praatio_scripts.splitAudioOnTier(
 73            inputFN, tgFN, tierName, tmpOutputPath, False
 74        )
 75        allPIList: List[Tuple[str, str, str]] = []
 76        for start, _, fn in splitAudioList:
 77            tmpTrackName = os.path.splitext(fn)[0] + ".txt"
 78            piList = _extractPIFile(
 79                join(tmpOutputPath, fn),
 80                join(tmpOutputPath, tmpTrackName),
 81                praatEXE,
 82                minPitch,
 83                maxPitch,
 84                sampleStep,
 85                silenceThreshold,
 86                pitchUnit,
 87                forceRegenerate=True,
 88                medianFilterWindowSize=windowSize,
 89                pitchQuadInterp=pitchQuadInterp,
 90            )
 91            convertedPiList = [
 92                ("%0.3f" % (float(time) + start), str(pV), str(iV))
 93                for time, pV, iV in piList
 94            ]
 95            allPIList.extend(convertedPiList)
 96
 97        outputData = [",".join(row) for row in allPIList]
 98        with open(outputFN, "w") as fd:
 99            fd.write("\n".join(outputData) + "\n")
100
101    return loadTimeSeriesData(outputFN, undefinedValue=undefinedValue)
102
103
104def _extractPIFile(
105    inputFN: str,
106    outputFN: str,
107    praatEXE: str,
108    minPitch: float,
109    maxPitch: float,
110    sampleStep: float = 0.01,
111    silenceThreshold: float = 0.03,
112    pitchUnit: str = HERTZ,
113    forceRegenerate: bool = True,
114    undefinedValue: float = None,
115    medianFilterWindowSize: int = 0,
116    pitchQuadInterp: bool = False,
117) -> List[Tuple[float, ...]]:
118    """Extracts pitch and intensity values from an audio file
119
120    Returns the result as a list.  Will load the serialized result
121    if this has already been called on the appropriate files before
122    """
123    outputPath = os.path.split(outputFN)[0]
124    utils.makeDir(outputPath)
125
126    if not os.path.exists(inputFN):
127        raise errors.ArgumentError(f"Required folder does not exist: f{inputFN}")
128
129    firstTime = not os.path.exists(outputFN)
130    if firstTime or forceRegenerate is True:
131        # The praat script uses append mode, so we need to clear any prior
132        # result
133        if os.path.exists(outputFN):
134            os.remove(outputFN)
135
136        if pitchQuadInterp is True:
137            doInterpolation = 1
138        else:
139            doInterpolation = 0
140
141        argList = [
142            inputFN,
143            outputFN,
144            sampleStep,
145            minPitch,
146            maxPitch,
147            silenceThreshold,
148            pitchUnit,
149            -1,
150            -1,
151            medianFilterWindowSize,
152            doInterpolation,
153        ]
154
155        scriptName = "get_pitch_and_intensity.praat"
156        scriptFN = join(utils.scriptsPath, scriptName)
157        utils.runPraatScript(praatEXE, scriptFN, argList)
158
159    return loadTimeSeriesData(outputFN, undefinedValue=undefinedValue)
160
161
162def extractIntensity(
163    inputFN: str,
164    outputFN: str,
165    praatEXE: str,
166    minPitch: float,
167    sampleStep: float = 0.01,
168    forceRegenerate: bool = True,
169    undefinedValue: float = None,
170) -> List[Tuple[float, ...]]:
171    """Extract the intensity for an audio file
172
173    Calculates intensity using the following praat command:
174    https://www.fon.hum.uva.nl/praat/manual/Sound__To_Intensity___.html
175    """
176    outputPath = os.path.split(outputFN)[0]
177    utils.makeDir(outputPath)
178
179    if not os.path.exists(inputFN):
180        raise errors.ArgumentError(f"Required folder does not exist: f{inputFN}")
181
182    firstTime = not os.path.exists(outputFN)
183    if firstTime or forceRegenerate is True:
184        # The praat script uses append mode, so we need to clear any prior
185        # result
186        if os.path.exists(outputFN):
187            os.remove(outputFN)
188
189        argList = [inputFN, outputFN, sampleStep, minPitch, -1, -1]
190
191        scriptName = "get_intensity.praat"
192        scriptFN = join(utils.scriptsPath, scriptName)
193        utils.runPraatScript(praatEXE, scriptFN, argList)
194
195    return loadTimeSeriesData(outputFN, undefinedValue=undefinedValue)
196
197
198def extractPitchTier(
199    wavFN: str,
200    outputFN: str,
201    praatEXE: str,
202    minPitch: float,
203    maxPitch: float,
204    sampleStep: float = 0.01,
205    silenceThreshold: float = 0.03,
206    forceRegenerate: bool = True,
207    medianFilterWindowSize: int = 0,
208    pitchQuadInterp: bool = False,
209) -> data_points.PointObject2D:
210    """Extract pitch at regular intervals from the input wav file
211
212    Data is output to a text file and then returned in a list in the form
213    [(timeV1, pitchV1), (timeV2, pitchV2), ...]
214
215    Calculates pitch using the following praat command:
216    https://www.fon.hum.uva.nl/praat/manual/Sound__To_Pitch___.html
217
218    Args:
219        sampleStep: the frequency to sample pitch at
220        silenceThreshold: segments with lower intensity won't be analyzed
221            for pitch
222        forceRegenerate: if running this function for the same file, if False
223            just read in the existing pitch file
224        pitchQuadInterp: if True, quadratically interpolate pitch
225
226    Returns:
227        The pitch data
228    """
229    outputPath = os.path.split(outputFN)[0]
230
231    utils.makeDir(outputPath)
232
233    if pitchQuadInterp is True:
234        doInterpolation = 1
235    else:
236        doInterpolation = 0
237
238    if not os.path.exists(wavFN):
239        raise errors.ArgumentError(f"Required file does not exist: f{wavFN}")
240
241    firstTime = not os.path.exists(outputFN)
242    if firstTime or forceRegenerate is True:
243        if os.path.exists(outputFN):
244            os.remove(outputFN)
245
246        argList = [
247            wavFN,
248            outputFN,
249            sampleStep,
250            minPitch,
251            maxPitch,
252            silenceThreshold,
253            medianFilterWindowSize,
254            doInterpolation,
255        ]
256
257        scriptName = "get_pitchtier.praat"
258        scriptFN = join(utils.scriptsPath, scriptName)
259        utils.runPraatScript(praatEXE, scriptFN, argList)
260
261    return data_points.open2DPointObject(outputFN)
262
263
264def extractPitch(
265    wavFN: str,
266    outputFN: str,
267    praatEXE: str,
268    minPitch: float,
269    maxPitch: float,
270    sampleStep: float = 0.01,
271    silenceThreshold: float = 0.03,
272    forceRegenerate: bool = True,
273    undefinedValue: float = None,
274    medianFilterWindowSize: int = 0,
275    pitchQuadInterp: bool = False,
276) -> List[Tuple[float, ...]]:
277    """Extract pitch at regular intervals from the input wav file
278
279    Data is output to a text file and then returned in a list in the form
280    [(timeV1, pitchV1), (timeV2, pitchV2), ...]
281
282    Calculates pitch using the following praat command:
283    https://www.fon.hum.uva.nl/praat/manual/Sound__To_Pitch___.html
284
285    Args:
286        sampleStep - the frequency to sample pitch at
287        silenceThreshold - segments with lower intensity won't be analyzed
288            for pitch
289        forceRegenerate - if running this function for the same file, if False
290            just read in the existing pitch file
291        undefinedValue - if None remove from the dataset, otherset set to
292            undefinedValue
293        pitchQuadInterp - if True, quadratically interpolate pitch
294    """
295    outputPath = os.path.split(outputFN)[0]
296
297    utils.makeDir(outputPath)
298
299    if pitchQuadInterp is True:
300        doInterpolation = 1
301    else:
302        doInterpolation = 0
303
304    if not os.path.exists(wavFN):
305        raise errors.ArgumentError(f"Required file does not exist: f{wavFN}")
306
307    firstTime = not os.path.exists(outputFN)
308    if firstTime or forceRegenerate is True:
309        if os.path.exists(outputFN):
310            os.remove(outputFN)
311
312        argList = [
313            wavFN,
314            outputFN,
315            sampleStep,
316            minPitch,
317            maxPitch,
318            silenceThreshold,
319            -1,
320            -1,
321            medianFilterWindowSize,
322            doInterpolation,
323        ]
324
325        scriptName = "get_pitch.praat"
326        scriptFN = join(utils.scriptsPath, scriptName)
327        utils.runPraatScript(praatEXE, scriptFN, argList)
328
329    return loadTimeSeriesData(outputFN, undefinedValue=undefinedValue)
330
331
332def extractPI(
333    inputFN: str,
334    outputFN: str,
335    praatEXE: str,
336    minPitch: float,
337    maxPitch: float,
338    sampleStep: float = 0.01,
339    silenceThreshold: float = 0.03,
340    pitchUnit: str = HERTZ,
341    forceRegenerate: bool = True,
342    tgFN: str = None,
343    tierName: str = None,
344    tmpOutputPath: str = None,
345    undefinedValue: float = None,
346    medianFilterWindowSize: int = 0,
347    pitchQuadInterp: bool = False,
348) -> List[Tuple[float, ...]]:
349    """Extracts pitch and intensity from a file wholesale or piecewise
350
351    If the parameters for a tg are passed in, this will only extract labeled
352    segments in a tier of the tg.  Otherwise, pitch will be extracted from
353    the entire file.
354
355    male: minPitch=50; maxPitch=350
356    female: minPitch=75; maxPitch=450
357    pitchUnit: "Hertz", "semitones re 100 Hz", etc
358
359    Calculates pitch and intensity using the following praat command:
360    https://www.fon.hum.uva.nl/praat/manual/Sound__To_Pitch___.html
361    https://www.fon.hum.uva.nl/praat/manual/Sound__To_Intensity___.html
362    """
363
364    outputPath = os.path.split(outputFN)[0]
365
366    windowSize = medianFilterWindowSize
367
368    if tgFN is None or tierName is None:
369        piList = _extractPIFile(
370            inputFN,
371            outputFN,
372            praatEXE,
373            minPitch,
374            maxPitch,
375            sampleStep,
376            silenceThreshold,
377            pitchUnit,
378            forceRegenerate,
379            undefinedValue=undefinedValue,
380            medianFilterWindowSize=windowSize,
381            pitchQuadInterp=pitchQuadInterp,
382        )
383    else:
384        if tmpOutputPath is None:
385            tmpOutputPath = join(outputPath, "piecewise_output")
386        piList = _extractPIPiecewise(
387            inputFN,
388            outputFN,
389            praatEXE,
390            minPitch,
391            maxPitch,
392            tgFN,
393            tierName,
394            tmpOutputPath,
395            sampleStep,
396            silenceThreshold,
397            pitchUnit,
398            forceRegenerate,
399            undefinedValue=undefinedValue,
400            medianFilterWindowSize=windowSize,
401            pitchQuadInterp=pitchQuadInterp,
402        )
403
404    return piList
405
406
407def loadTimeSeriesData(
408    fn: str, undefinedValue: float = None
409) -> List[Tuple[float, ...]]:
410    """For reading the output of get_pitch_and_intensity or get_intensity
411
412    Data should be of the form
413    [(time1, value1a, value1b, ...),
414     (time2, value2a, value2b, ...), ]
415    """
416    name = os.path.splitext(os.path.split(fn)[1])[0]
417
418    try:
419        with io.open(fn, "r", encoding="utf-8") as fd:
420            data = fd.read()
421    except IOError:
422        print(f"No pitch track for: {name}")
423        raise
424
425    dataList = [row.split(",") for row in data.splitlines() if row != ""]
426
427    # The new praat script includes a header
428    if dataList[0][0] == "time":
429        dataList = dataList[1:]
430
431    newDataList = []
432    for row in dataList:
433        time = float(row.pop(0))
434        entry = [
435            time,
436        ]
437        doSkip = False
438        for value in row:
439            if "--" in value:
440                if undefinedValue is not None:
441                    appendValue = undefinedValue
442                else:
443                    doSkip = True
444                    break
445            else:
446                appendValue = float(value)
447
448            entry.append(appendValue)
449
450        if doSkip is True:
451            continue
452
453        newDataList.append(tuple(entry))
454
455    return newDataList
456
457
458def generatePIMeasures(
459    dataList: List[Tuple[float, float, float]],
460    tgFN: str,
461    tierName: str,
462    doPitch: bool,
463    medianFilterWindowSize: int = None,
464    globalZNormalization: bool = False,
465    localZNormalizationWindowSize: int = 0,
466) -> List[Tuple[float, ...]]:
467    """Generates processed values for the labeled intervals in a textgrid
468
469    Args:
470        doPitch: if True get pitch measures; if False get rms intensity
471        medianFilterWindowSize: if none, no filtering is done
472        globalZNormalization: if True, values are normalized with the mean
473            and stdDev of the data in dataList
474        localZNormalization: if greater than 1, values are normalized with the mean
475            and stdDev of the local context (for a window of 5, it
476            would consider the current value, 2 values before and 2
477            values after)
478    """
479
480    # Warn user that normalizing a second time nullifies the first normalization
481    if globalZNormalization is True and localZNormalizationWindowSize > 0:
482        raise errors.NormalizationException()
483
484    castDataList = cast(List[Tuple[float, ...]], dataList)
485    if globalZNormalization is True:
486        if doPitch:
487            castDataList = my_math.znormalizeSpeakerData(castDataList, 1, True)
488        else:
489            castDataList = my_math.znormalizeSpeakerData(castDataList, 2, True)
490
491    # Raw values should have 0 filtered; normalized values are centered around 0, so don't filter
492    filterZeroFlag = not globalZNormalization
493
494    tg = textgrid.openTextgrid(tgFN, False)
495    if not isinstance(tg.getTier(tierName), textgrid.IntervalTier):
496        raise errors.IncompatibleTierError(tg.getTier(tierName))
497
498    tier = cast(textgrid.IntervalTier, tg.getTier(tierName))
499    piData = tier.getValuesInIntervals(castDataList)
500
501    outputList: List[List[float]] = []
502    for interval, entries in piData:
503        label = interval[0]
504        if doPitch:
505            tmpValList = [f0Val for _, f0Val, _ in entries]
506            f0Measures = getPitchMeasures(
507                tmpValList, tgFN, label, medianFilterWindowSize, filterZeroFlag
508            )
509            outputList.append(list(f0Measures))
510        else:
511            tmpValList = [intensityVal for _, _, intensityVal in entries]
512
513            if filterZeroFlag:
514                tmpValList = [
515                    intensityVal for intensityVal in tmpValList if intensityVal != 0.0
516                ]
517
518            rmsIntensity = 0.0
519            if len(tmpValList) != 0:
520                rmsIntensity = my_math.rms(tmpValList)
521            outputList.append(
522                [
523                    rmsIntensity,
524                ]
525            )
526
527    # Locally normalize the output
528    if localZNormalizationWindowSize > 0 and len(outputList) > 0:
529        for colI in range(len(outputList[0])):
530            featValList = [row[colI] for row in outputList]
531
532            featValList = my_math.znormWindowFilter(
533                featValList, localZNormalizationWindowSize, True, True
534            )
535            if len(featValList) != len(outputList):  # This should hopefully not happen
536                raise errors.UnexpectedError(
537                    "Lists must be of the same length but are not: "
538                    f"({len(featValList)}), ({len(outputList)})"
539                )
540
541            for i, val in enumerate(featValList):
542                outputList[i][colI] = val
543
544    return [tuple(row) for row in outputList]
545
546
547def getPitchMeasures(
548    f0Values: List[float],
549    name: str = None,
550    label: str = None,
551    medianFilterWindowSize: int = None,
552    filterZeroFlag: bool = False,
553) -> Tuple[float, float, float, float, float, float]:
554    """Get various measures (min, max, etc) for the passed in list of pitch values
555
556    Args:
557        name: the name of the file.
558        label: the label of the current interval.
559        medianFilterWindowSize: if None, there is no median filtering
560        filterZeroFlag: if True, values of zero are removed
561    """
562
563    if name is None:
564        name = UNSPECIFIED
565    if label is None:
566        label = UNSPECIFIED
567
568    if medianFilterWindowSize is not None:
569        f0Values = my_math.medianFilter(
570            f0Values, medianFilterWindowSize, useEdgePadding=True
571        )
572
573    if filterZeroFlag:
574        f0Values = [f0Val for f0Val in f0Values if int(f0Val) != 0]
575
576    if len(f0Values) == 0:
577        myStr = f"No pitch data for file: {name}, label: {label}"
578        print(myStr.encode("ascii", "replace"))
579        counts = 0.0
580        meanF0 = 0.0
581        maxF0 = 0.0
582        minF0 = 0.0
583        rangeF0 = 0.0
584        variance = 0.0
585        std = 0.0
586    else:
587        counts = float(len(f0Values))
588        meanF0 = sum(f0Values) / counts
589        maxF0 = max(f0Values)
590        minF0 = min(f0Values)
591        rangeF0 = maxF0 - minF0
592
593        variance = sum([(val - meanF0) ** 2 for val in f0Values]) / counts
594        std = math.sqrt(variance)
595
596    return (meanF0, maxF0, minF0, rangeF0, variance, std)
597
598
599def detectPitchErrors(
600    pitchList: List[Tuple[float, float]],
601    maxJumpThreshold: float = 0.70,
602    tgToMark: Optional[textgrid.Textgrid] = None,
603) -> Tuple[List[Point], Optional[textgrid.Textgrid]]:
604    """Detect pitch halving and doubling errors.
605
606    If a textgrid is passed in, it adds the markings to the textgrid
607    """
608    if maxJumpThreshold < 0 or maxJumpThreshold > 1:
609        raise errors.ArgumentError(
610            f"'maxJumpThreshold' must be between 0 and 1.  Was given ({maxJumpThreshold})"
611        )
612
613    tierName = _PITCH_ERROR_TIER_NAME
614    if tgToMark is not None and tierName in tgToMark.tierNames:
615        raise errors.ArgumentError(
616            f"Tier name '{tierName}' is already in provided textgrid"
617        )
618
619    errorList = []
620    for i in range(1, len(pitchList)):
621        lastPitch = pitchList[i - 1][1]
622        currentPitch = pitchList[i][1]
623
624        ceilingCutoff = currentPitch / maxJumpThreshold
625        floorCutoff = currentPitch * maxJumpThreshold
626        if (lastPitch <= floorCutoff) or (lastPitch >= ceilingCutoff):
627            currentTime = pitchList[i][0]
628            errorList.append(Point(currentTime, str(currentPitch / lastPitch)))
629
630    if tgToMark is not None:
631        pointTier = textgrid.PointTier(
632            tierName, errorList, tgToMark.minTimestamp, tgToMark.maxTimestamp
633        )
634        tgToMark.addTier(pointTier)
635
636    return errorList, tgToMark
HERTZ = 'Hertz'
UNSPECIFIED = 'unspecified'
def extractIntensity( inputFN: str, outputFN: str, praatEXE: str, minPitch: float, sampleStep: float = 0.01, forceRegenerate: bool = True, undefinedValue: float = None) -> List[Tuple[float, ...]]:
163def extractIntensity(
164    inputFN: str,
165    outputFN: str,
166    praatEXE: str,
167    minPitch: float,
168    sampleStep: float = 0.01,
169    forceRegenerate: bool = True,
170    undefinedValue: float = None,
171) -> List[Tuple[float, ...]]:
172    """Extract the intensity for an audio file
173
174    Calculates intensity using the following praat command:
175    https://www.fon.hum.uva.nl/praat/manual/Sound__To_Intensity___.html
176    """
177    outputPath = os.path.split(outputFN)[0]
178    utils.makeDir(outputPath)
179
180    if not os.path.exists(inputFN):
181        raise errors.ArgumentError(f"Required folder does not exist: f{inputFN}")
182
183    firstTime = not os.path.exists(outputFN)
184    if firstTime or forceRegenerate is True:
185        # The praat script uses append mode, so we need to clear any prior
186        # result
187        if os.path.exists(outputFN):
188            os.remove(outputFN)
189
190        argList = [inputFN, outputFN, sampleStep, minPitch, -1, -1]
191
192        scriptName = "get_intensity.praat"
193        scriptFN = join(utils.scriptsPath, scriptName)
194        utils.runPraatScript(praatEXE, scriptFN, argList)
195
196    return loadTimeSeriesData(outputFN, undefinedValue=undefinedValue)

Extract the intensity for an audio file

Calculates intensity using the following praat command: https://www.fon.hum.uva.nl/praat/manual/Sound__To_Intensity___.html

def extractPitchTier( wavFN: str, outputFN: str, praatEXE: str, minPitch: float, maxPitch: float, sampleStep: float = 0.01, silenceThreshold: float = 0.03, forceRegenerate: bool = True, medianFilterWindowSize: int = 0, pitchQuadInterp: bool = False) -> praatio.data_classes.data_point.PointObject2D:
199def extractPitchTier(
200    wavFN: str,
201    outputFN: str,
202    praatEXE: str,
203    minPitch: float,
204    maxPitch: float,
205    sampleStep: float = 0.01,
206    silenceThreshold: float = 0.03,
207    forceRegenerate: bool = True,
208    medianFilterWindowSize: int = 0,
209    pitchQuadInterp: bool = False,
210) -> data_points.PointObject2D:
211    """Extract pitch at regular intervals from the input wav file
212
213    Data is output to a text file and then returned in a list in the form
214    [(timeV1, pitchV1), (timeV2, pitchV2), ...]
215
216    Calculates pitch using the following praat command:
217    https://www.fon.hum.uva.nl/praat/manual/Sound__To_Pitch___.html
218
219    Args:
220        sampleStep: the frequency to sample pitch at
221        silenceThreshold: segments with lower intensity won't be analyzed
222            for pitch
223        forceRegenerate: if running this function for the same file, if False
224            just read in the existing pitch file
225        pitchQuadInterp: if True, quadratically interpolate pitch
226
227    Returns:
228        The pitch data
229    """
230    outputPath = os.path.split(outputFN)[0]
231
232    utils.makeDir(outputPath)
233
234    if pitchQuadInterp is True:
235        doInterpolation = 1
236    else:
237        doInterpolation = 0
238
239    if not os.path.exists(wavFN):
240        raise errors.ArgumentError(f"Required file does not exist: f{wavFN}")
241
242    firstTime = not os.path.exists(outputFN)
243    if firstTime or forceRegenerate is True:
244        if os.path.exists(outputFN):
245            os.remove(outputFN)
246
247        argList = [
248            wavFN,
249            outputFN,
250            sampleStep,
251            minPitch,
252            maxPitch,
253            silenceThreshold,
254            medianFilterWindowSize,
255            doInterpolation,
256        ]
257
258        scriptName = "get_pitchtier.praat"
259        scriptFN = join(utils.scriptsPath, scriptName)
260        utils.runPraatScript(praatEXE, scriptFN, argList)
261
262    return data_points.open2DPointObject(outputFN)

Extract pitch at regular intervals from the input wav file

Data is output to a text file and then returned in a list in the form [(timeV1, pitchV1), (timeV2, pitchV2), ...]

Calculates pitch using the following praat command: https://www.fon.hum.uva.nl/praat/manual/Sound__To_Pitch___.html

Arguments:
  • sampleStep: the frequency to sample pitch at
  • silenceThreshold: segments with lower intensity won't be analyzed for pitch
  • forceRegenerate: if running this function for the same file, if False just read in the existing pitch file
  • pitchQuadInterp: if True, quadratically interpolate pitch
Returns:

The pitch data

def extractPitch( wavFN: str, outputFN: str, praatEXE: str, minPitch: float, maxPitch: float, sampleStep: float = 0.01, silenceThreshold: float = 0.03, forceRegenerate: bool = True, undefinedValue: float = None, medianFilterWindowSize: int = 0, pitchQuadInterp: bool = False) -> List[Tuple[float, ...]]:
265def extractPitch(
266    wavFN: str,
267    outputFN: str,
268    praatEXE: str,
269    minPitch: float,
270    maxPitch: float,
271    sampleStep: float = 0.01,
272    silenceThreshold: float = 0.03,
273    forceRegenerate: bool = True,
274    undefinedValue: float = None,
275    medianFilterWindowSize: int = 0,
276    pitchQuadInterp: bool = False,
277) -> List[Tuple[float, ...]]:
278    """Extract pitch at regular intervals from the input wav file
279
280    Data is output to a text file and then returned in a list in the form
281    [(timeV1, pitchV1), (timeV2, pitchV2), ...]
282
283    Calculates pitch using the following praat command:
284    https://www.fon.hum.uva.nl/praat/manual/Sound__To_Pitch___.html
285
286    Args:
287        sampleStep - the frequency to sample pitch at
288        silenceThreshold - segments with lower intensity won't be analyzed
289            for pitch
290        forceRegenerate - if running this function for the same file, if False
291            just read in the existing pitch file
292        undefinedValue - if None remove from the dataset, otherset set to
293            undefinedValue
294        pitchQuadInterp - if True, quadratically interpolate pitch
295    """
296    outputPath = os.path.split(outputFN)[0]
297
298    utils.makeDir(outputPath)
299
300    if pitchQuadInterp is True:
301        doInterpolation = 1
302    else:
303        doInterpolation = 0
304
305    if not os.path.exists(wavFN):
306        raise errors.ArgumentError(f"Required file does not exist: f{wavFN}")
307
308    firstTime = not os.path.exists(outputFN)
309    if firstTime or forceRegenerate is True:
310        if os.path.exists(outputFN):
311            os.remove(outputFN)
312
313        argList = [
314            wavFN,
315            outputFN,
316            sampleStep,
317            minPitch,
318            maxPitch,
319            silenceThreshold,
320            -1,
321            -1,
322            medianFilterWindowSize,
323            doInterpolation,
324        ]
325
326        scriptName = "get_pitch.praat"
327        scriptFN = join(utils.scriptsPath, scriptName)
328        utils.runPraatScript(praatEXE, scriptFN, argList)
329
330    return loadTimeSeriesData(outputFN, undefinedValue=undefinedValue)

Extract pitch at regular intervals from the input wav file

Data is output to a text file and then returned in a list in the form [(timeV1, pitchV1), (timeV2, pitchV2), ...]

Calculates pitch using the following praat command: https://www.fon.hum.uva.nl/praat/manual/Sound__To_Pitch___.html

Arguments:
  • sampleStep - the frequency to sample pitch at
  • silenceThreshold - segments with lower intensity won't be analyzed for pitch
  • forceRegenerate - if running this function for the same file, if False just read in the existing pitch file
  • undefinedValue - if None remove from the dataset, otherset set to undefinedValue
  • pitchQuadInterp - if True, quadratically interpolate pitch
def extractPI( inputFN: str, outputFN: str, praatEXE: str, minPitch: float, maxPitch: float, sampleStep: float = 0.01, silenceThreshold: float = 0.03, pitchUnit: str = 'Hertz', forceRegenerate: bool = True, tgFN: str = None, tierName: str = None, tmpOutputPath: str = None, undefinedValue: float = None, medianFilterWindowSize: int = 0, pitchQuadInterp: bool = False) -> List[Tuple[float, ...]]:
333def extractPI(
334    inputFN: str,
335    outputFN: str,
336    praatEXE: str,
337    minPitch: float,
338    maxPitch: float,
339    sampleStep: float = 0.01,
340    silenceThreshold: float = 0.03,
341    pitchUnit: str = HERTZ,
342    forceRegenerate: bool = True,
343    tgFN: str = None,
344    tierName: str = None,
345    tmpOutputPath: str = None,
346    undefinedValue: float = None,
347    medianFilterWindowSize: int = 0,
348    pitchQuadInterp: bool = False,
349) -> List[Tuple[float, ...]]:
350    """Extracts pitch and intensity from a file wholesale or piecewise
351
352    If the parameters for a tg are passed in, this will only extract labeled
353    segments in a tier of the tg.  Otherwise, pitch will be extracted from
354    the entire file.
355
356    male: minPitch=50; maxPitch=350
357    female: minPitch=75; maxPitch=450
358    pitchUnit: "Hertz", "semitones re 100 Hz", etc
359
360    Calculates pitch and intensity using the following praat command:
361    https://www.fon.hum.uva.nl/praat/manual/Sound__To_Pitch___.html
362    https://www.fon.hum.uva.nl/praat/manual/Sound__To_Intensity___.html
363    """
364
365    outputPath = os.path.split(outputFN)[0]
366
367    windowSize = medianFilterWindowSize
368
369    if tgFN is None or tierName is None:
370        piList = _extractPIFile(
371            inputFN,
372            outputFN,
373            praatEXE,
374            minPitch,
375            maxPitch,
376            sampleStep,
377            silenceThreshold,
378            pitchUnit,
379            forceRegenerate,
380            undefinedValue=undefinedValue,
381            medianFilterWindowSize=windowSize,
382            pitchQuadInterp=pitchQuadInterp,
383        )
384    else:
385        if tmpOutputPath is None:
386            tmpOutputPath = join(outputPath, "piecewise_output")
387        piList = _extractPIPiecewise(
388            inputFN,
389            outputFN,
390            praatEXE,
391            minPitch,
392            maxPitch,
393            tgFN,
394            tierName,
395            tmpOutputPath,
396            sampleStep,
397            silenceThreshold,
398            pitchUnit,
399            forceRegenerate,
400            undefinedValue=undefinedValue,
401            medianFilterWindowSize=windowSize,
402            pitchQuadInterp=pitchQuadInterp,
403        )
404
405    return piList

Extracts pitch and intensity from a file wholesale or piecewise

If the parameters for a tg are passed in, this will only extract labeled segments in a tier of the tg. Otherwise, pitch will be extracted from the entire file.

male: minPitch=50; maxPitch=350 female: minPitch=75; maxPitch=450 pitchUnit: "Hertz", "semitones re 100 Hz", etc

Calculates pitch and intensity using the following praat command: https://www.fon.hum.uva.nl/praat/manual/Sound__To_Pitch___.html https://www.fon.hum.uva.nl/praat/manual/Sound__To_Intensity___.html

def loadTimeSeriesData(fn: str, undefinedValue: float = None) -> List[Tuple[float, ...]]:
408def loadTimeSeriesData(
409    fn: str, undefinedValue: float = None
410) -> List[Tuple[float, ...]]:
411    """For reading the output of get_pitch_and_intensity or get_intensity
412
413    Data should be of the form
414    [(time1, value1a, value1b, ...),
415     (time2, value2a, value2b, ...), ]
416    """
417    name = os.path.splitext(os.path.split(fn)[1])[0]
418
419    try:
420        with io.open(fn, "r", encoding="utf-8") as fd:
421            data = fd.read()
422    except IOError:
423        print(f"No pitch track for: {name}")
424        raise
425
426    dataList = [row.split(",") for row in data.splitlines() if row != ""]
427
428    # The new praat script includes a header
429    if dataList[0][0] == "time":
430        dataList = dataList[1:]
431
432    newDataList = []
433    for row in dataList:
434        time = float(row.pop(0))
435        entry = [
436            time,
437        ]
438        doSkip = False
439        for value in row:
440            if "--" in value:
441                if undefinedValue is not None:
442                    appendValue = undefinedValue
443                else:
444                    doSkip = True
445                    break
446            else:
447                appendValue = float(value)
448
449            entry.append(appendValue)
450
451        if doSkip is True:
452            continue
453
454        newDataList.append(tuple(entry))
455
456    return newDataList

For reading the output of get_pitch_and_intensity or get_intensity

Data should be of the form [(time1, value1a, value1b, ...), (time2, value2a, value2b, ...), ]

def generatePIMeasures( dataList: List[Tuple[float, float, float]], tgFN: str, tierName: str, doPitch: bool, medianFilterWindowSize: int = None, globalZNormalization: bool = False, localZNormalizationWindowSize: int = 0) -> List[Tuple[float, ...]]:
459def generatePIMeasures(
460    dataList: List[Tuple[float, float, float]],
461    tgFN: str,
462    tierName: str,
463    doPitch: bool,
464    medianFilterWindowSize: int = None,
465    globalZNormalization: bool = False,
466    localZNormalizationWindowSize: int = 0,
467) -> List[Tuple[float, ...]]:
468    """Generates processed values for the labeled intervals in a textgrid
469
470    Args:
471        doPitch: if True get pitch measures; if False get rms intensity
472        medianFilterWindowSize: if none, no filtering is done
473        globalZNormalization: if True, values are normalized with the mean
474            and stdDev of the data in dataList
475        localZNormalization: if greater than 1, values are normalized with the mean
476            and stdDev of the local context (for a window of 5, it
477            would consider the current value, 2 values before and 2
478            values after)
479    """
480
481    # Warn user that normalizing a second time nullifies the first normalization
482    if globalZNormalization is True and localZNormalizationWindowSize > 0:
483        raise errors.NormalizationException()
484
485    castDataList = cast(List[Tuple[float, ...]], dataList)
486    if globalZNormalization is True:
487        if doPitch:
488            castDataList = my_math.znormalizeSpeakerData(castDataList, 1, True)
489        else:
490            castDataList = my_math.znormalizeSpeakerData(castDataList, 2, True)
491
492    # Raw values should have 0 filtered; normalized values are centered around 0, so don't filter
493    filterZeroFlag = not globalZNormalization
494
495    tg = textgrid.openTextgrid(tgFN, False)
496    if not isinstance(tg.getTier(tierName), textgrid.IntervalTier):
497        raise errors.IncompatibleTierError(tg.getTier(tierName))
498
499    tier = cast(textgrid.IntervalTier, tg.getTier(tierName))
500    piData = tier.getValuesInIntervals(castDataList)
501
502    outputList: List[List[float]] = []
503    for interval, entries in piData:
504        label = interval[0]
505        if doPitch:
506            tmpValList = [f0Val for _, f0Val, _ in entries]
507            f0Measures = getPitchMeasures(
508                tmpValList, tgFN, label, medianFilterWindowSize, filterZeroFlag
509            )
510            outputList.append(list(f0Measures))
511        else:
512            tmpValList = [intensityVal for _, _, intensityVal in entries]
513
514            if filterZeroFlag:
515                tmpValList = [
516                    intensityVal for intensityVal in tmpValList if intensityVal != 0.0
517                ]
518
519            rmsIntensity = 0.0
520            if len(tmpValList) != 0:
521                rmsIntensity = my_math.rms(tmpValList)
522            outputList.append(
523                [
524                    rmsIntensity,
525                ]
526            )
527
528    # Locally normalize the output
529    if localZNormalizationWindowSize > 0 and len(outputList) > 0:
530        for colI in range(len(outputList[0])):
531            featValList = [row[colI] for row in outputList]
532
533            featValList = my_math.znormWindowFilter(
534                featValList, localZNormalizationWindowSize, True, True
535            )
536            if len(featValList) != len(outputList):  # This should hopefully not happen
537                raise errors.UnexpectedError(
538                    "Lists must be of the same length but are not: "
539                    f"({len(featValList)}), ({len(outputList)})"
540                )
541
542            for i, val in enumerate(featValList):
543                outputList[i][colI] = val
544
545    return [tuple(row) for row in outputList]

Generates processed values for the labeled intervals in a textgrid

Arguments:
  • doPitch: if True get pitch measures; if False get rms intensity
  • medianFilterWindowSize: if none, no filtering is done
  • globalZNormalization: if True, values are normalized with the mean and stdDev of the data in dataList
  • localZNormalization: if greater than 1, values are normalized with the mean and stdDev of the local context (for a window of 5, it would consider the current value, 2 values before and 2 values after)
def getPitchMeasures( f0Values: List[float], name: str = None, label: str = None, medianFilterWindowSize: int = None, filterZeroFlag: bool = False) -> Tuple[float, float, float, float, float, float]:
548def getPitchMeasures(
549    f0Values: List[float],
550    name: str = None,
551    label: str = None,
552    medianFilterWindowSize: int = None,
553    filterZeroFlag: bool = False,
554) -> Tuple[float, float, float, float, float, float]:
555    """Get various measures (min, max, etc) for the passed in list of pitch values
556
557    Args:
558        name: the name of the file.
559        label: the label of the current interval.
560        medianFilterWindowSize: if None, there is no median filtering
561        filterZeroFlag: if True, values of zero are removed
562    """
563
564    if name is None:
565        name = UNSPECIFIED
566    if label is None:
567        label = UNSPECIFIED
568
569    if medianFilterWindowSize is not None:
570        f0Values = my_math.medianFilter(
571            f0Values, medianFilterWindowSize, useEdgePadding=True
572        )
573
574    if filterZeroFlag:
575        f0Values = [f0Val for f0Val in f0Values if int(f0Val) != 0]
576
577    if len(f0Values) == 0:
578        myStr = f"No pitch data for file: {name}, label: {label}"
579        print(myStr.encode("ascii", "replace"))
580        counts = 0.0
581        meanF0 = 0.0
582        maxF0 = 0.0
583        minF0 = 0.0
584        rangeF0 = 0.0
585        variance = 0.0
586        std = 0.0
587    else:
588        counts = float(len(f0Values))
589        meanF0 = sum(f0Values) / counts
590        maxF0 = max(f0Values)
591        minF0 = min(f0Values)
592        rangeF0 = maxF0 - minF0
593
594        variance = sum([(val - meanF0) ** 2 for val in f0Values]) / counts
595        std = math.sqrt(variance)
596
597    return (meanF0, maxF0, minF0, rangeF0, variance, std)

Get various measures (min, max, etc) for the passed in list of pitch values

Arguments:
  • name: the name of the file.
  • label: the label of the current interval.
  • medianFilterWindowSize: if None, there is no median filtering
  • filterZeroFlag: if True, values of zero are removed
def detectPitchErrors( pitchList: List[Tuple[float, float]], maxJumpThreshold: float = 0.7, tgToMark: Optional[praatio.data_classes.textgrid.Textgrid] = None) -> Tuple[List[praatio.utilities.constants.Point], Optional[praatio.data_classes.textgrid.Textgrid]]:
600def detectPitchErrors(
601    pitchList: List[Tuple[float, float]],
602    maxJumpThreshold: float = 0.70,
603    tgToMark: Optional[textgrid.Textgrid] = None,
604) -> Tuple[List[Point], Optional[textgrid.Textgrid]]:
605    """Detect pitch halving and doubling errors.
606
607    If a textgrid is passed in, it adds the markings to the textgrid
608    """
609    if maxJumpThreshold < 0 or maxJumpThreshold > 1:
610        raise errors.ArgumentError(
611            f"'maxJumpThreshold' must be between 0 and 1.  Was given ({maxJumpThreshold})"
612        )
613
614    tierName = _PITCH_ERROR_TIER_NAME
615    if tgToMark is not None and tierName in tgToMark.tierNames:
616        raise errors.ArgumentError(
617            f"Tier name '{tierName}' is already in provided textgrid"
618        )
619
620    errorList = []
621    for i in range(1, len(pitchList)):
622        lastPitch = pitchList[i - 1][1]
623        currentPitch = pitchList[i][1]
624
625        ceilingCutoff = currentPitch / maxJumpThreshold
626        floorCutoff = currentPitch * maxJumpThreshold
627        if (lastPitch <= floorCutoff) or (lastPitch >= ceilingCutoff):
628            currentTime = pitchList[i][0]
629            errorList.append(Point(currentTime, str(currentPitch / lastPitch)))
630
631    if tgToMark is not None:
632        pointTier = textgrid.PointTier(
633            tierName, errorList, tgToMark.minTimestamp, tgToMark.maxTimestamp
634        )
635        tgToMark.addTier(pointTier)
636
637    return errorList, tgToMark

Detect pitch halving and doubling errors.

If a textgrid is passed in, it adds the markings to the textgrid