1# Copyright © 2021 Arm Ltd and Contributors. All rights reserved. 2# SPDX-License-Identifier: MIT 3 4"""Class used to extract the Mel-frequency cepstral coefficients from a given audio frame.""" 5 6import numpy as np 7import collections 8 9MFCCParams = collections.namedtuple('MFCCParams', ['sampling_freq', 'num_fbank_bins', 'mel_lo_freq', 'mel_hi_freq', 10 'num_mfcc_feats', 'frame_len', 'use_htk_method', 'n_fft']) 11 12 13class MFCC: 14 15 def __init__(self, mfcc_params): 16 self.mfcc_params = mfcc_params 17 self.FREQ_STEP = 200.0 / 3 18 self.MIN_LOG_HZ = 1000.0 19 self.MIN_LOG_MEL = self.MIN_LOG_HZ / self.FREQ_STEP 20 self.LOG_STEP = 1.8562979903656 / 27.0 21 self._frame_len_padded = int(2 ** (np.ceil((np.log(self.mfcc_params.frame_len) / np.log(2.0))))) 22 self._filter_bank_initialised = False 23 self.__frame = np.zeros(self._frame_len_padded) 24 self.__buffer = np.zeros(self._frame_len_padded) 25 self._filter_bank_filter_first = np.zeros(self.mfcc_params.num_fbank_bins) 26 self._filter_bank_filter_last = np.zeros(self.mfcc_params.num_fbank_bins) 27 self.__mel_energies = np.zeros(self.mfcc_params.num_fbank_bins) 28 self._dct_matrix = self.create_dct_matrix(self.mfcc_params.num_fbank_bins, self.mfcc_params.num_mfcc_feats) 29 self.__mel_filter_bank = self.create_mel_filter_bank() 30 self._np_mel_bank = np.zeros([self.mfcc_params.num_fbank_bins, int(self.mfcc_params.n_fft / 2) + 1]) 31 32 for i in range(self.mfcc_params.num_fbank_bins): 33 k = 0 34 for j in range(int(self._filter_bank_filter_first[i]), int(self._filter_bank_filter_last[i]) + 1): 35 self._np_mel_bank[i, j] = self.__mel_filter_bank[i][k] 36 k += 1 37 38 def mel_scale(self, freq, use_htk_method): 39 """ 40 Gets the mel scale for a particular sample frequency. 41 42 Args: 43 freq: The sampling frequency. 44 use_htk_method: Boolean to set whether to use HTK method or not. 45 46 Returns: 47 the mel scale 48 """ 49 if use_htk_method: 50 return 1127.0 * np.log(1.0 + freq / 700.0) 51 else: 52 mel = freq / self.FREQ_STEP 53 54 if freq >= self.MIN_LOG_HZ: 55 mel = self.MIN_LOG_MEL + np.log(freq / self.MIN_LOG_HZ) / self.LOG_STEP 56 return mel 57 58 def inv_mel_scale(self, mel_freq, use_htk_method): 59 """ 60 Gets the sample frequency for a particular mel. 61 62 Args: 63 mel_freq: The mel frequency. 64 use_htk_method: Boolean to set whether to use HTK method or not. 65 66 Returns: 67 the sample frequency 68 """ 69 if use_htk_method: 70 return 700.0 * (np.exp(mel_freq / 1127.0) - 1.0) 71 else: 72 freq = self.FREQ_STEP * mel_freq 73 74 if mel_freq >= self.MIN_LOG_MEL: 75 freq = self.MIN_LOG_HZ * np.exp(self.LOG_STEP * (mel_freq - self.MIN_LOG_MEL)) 76 return freq 77 78 def spectrum_calc(self, audio_data): 79 return np.abs(np.fft.rfft(np.hanning(self.mfcc_params.frame_len + 1)[0:self.mfcc_params.frame_len] * audio_data, 80 self.mfcc_params.n_fft)) 81 82 def log_mel(self, mel_energy): 83 mel_energy += 1e-10 # Avoid division by zero 84 return np.log(mel_energy) 85 86 def mfcc_compute(self, audio_data): 87 """ 88 Extracts the MFCC for a single frame. 89 90 Args: 91 audio_data: The audio data to process. 92 93 Returns: 94 the MFCC features 95 """ 96 if len(audio_data) != self.mfcc_params.frame_len: 97 raise ValueError( 98 f"audio_data buffer size {len(audio_data)} does not match frame length {self.mfcc_params.frame_len}") 99 100 audio_data = np.array(audio_data) 101 spec = self.spectrum_calc(audio_data) 102 mel_energy = np.dot(self._np_mel_bank.astype(np.float32), 103 np.transpose(spec).astype(np.float32)) 104 log_mel_energy = self.log_mel(mel_energy) 105 mfcc_feats = np.dot(self._dct_matrix, log_mel_energy) 106 return mfcc_feats 107 108 def create_dct_matrix(self, num_fbank_bins, num_mfcc_feats): 109 """ 110 Creates the Discrete Cosine Transform matrix to be used in the compute function. 111 112 Args: 113 num_fbank_bins: The number of filter bank bins 114 num_mfcc_feats: the number of MFCC features 115 116 Returns: 117 the DCT matrix 118 """ 119 120 dct_m = np.zeros(num_fbank_bins * num_mfcc_feats) 121 for k in range(num_mfcc_feats): 122 for n in range(num_fbank_bins): 123 dct_m[(k * num_fbank_bins) + n] = (np.sqrt(2 / num_fbank_bins)) * np.cos( 124 (np.pi / num_fbank_bins) * (n + 0.5) * k) 125 dct_m = np.reshape(dct_m, [self.mfcc_params.num_mfcc_feats, self.mfcc_params.num_fbank_bins]) 126 return dct_m 127 128 def mel_norm(self, weight, right_mel, left_mel): 129 """ 130 Placeholder function over-ridden in child class 131 """ 132 return weight 133 134 def create_mel_filter_bank(self): 135 """ 136 Creates the Mel filter bank. 137 138 Returns: 139 the mel filter bank 140 """ 141 # FFT calculations are greatly accelerated for frame lengths which are powers of 2 142 # Frames are padded and FFT bin width/length calculated accordingly 143 num_fft_bins = int(self._frame_len_padded / 2) 144 fft_bin_width = self.mfcc_params.sampling_freq / self._frame_len_padded 145 146 mel_low_freq = self.mel_scale(self.mfcc_params.mel_lo_freq, self.mfcc_params.use_htk_method) 147 mel_high_freq = self.mel_scale(self.mfcc_params.mel_hi_freq, self.mfcc_params.use_htk_method) 148 mel_freq_delta = (mel_high_freq - mel_low_freq) / (self.mfcc_params.num_fbank_bins + 1) 149 150 this_bin = np.zeros(num_fft_bins) 151 mel_fbank = [0] * self.mfcc_params.num_fbank_bins 152 for bin_num in range(self.mfcc_params.num_fbank_bins): 153 left_mel = mel_low_freq + bin_num * mel_freq_delta 154 center_mel = mel_low_freq + (bin_num + 1) * mel_freq_delta 155 right_mel = mel_low_freq + (bin_num + 2) * mel_freq_delta 156 first_index = last_index = -1 157 158 for i in range(num_fft_bins): 159 freq = (fft_bin_width * i) 160 mel = self.mel_scale(freq, self.mfcc_params.use_htk_method) 161 this_bin[i] = 0.0 162 163 if (mel > left_mel) and (mel < right_mel): 164 if mel <= center_mel: 165 weight = (mel - left_mel) / (center_mel - left_mel) 166 else: 167 weight = (right_mel - mel) / (right_mel - center_mel) 168 169 this_bin[i] = self.mel_norm(weight, right_mel, left_mel) 170 171 if first_index == -1: 172 first_index = i 173 last_index = i 174 175 self._filter_bank_filter_first[bin_num] = first_index 176 self._filter_bank_filter_last[bin_num] = last_index 177 mel_fbank[bin_num] = np.zeros(last_index - first_index + 1) 178 j = 0 179 180 for i in range(first_index, last_index + 1): 181 mel_fbank[bin_num][j] = this_bin[i] 182 j += 1 183 184 return mel_fbank 185 186 187class AudioPreprocessor: 188 189 def __init__(self, mfcc, model_input_size, stride): 190 self.model_input_size = model_input_size 191 self.stride = stride 192 self._mfcc_calc = mfcc 193 194 def _normalize(self, values): 195 """ 196 Normalize values to mean 0 and std 1 197 """ 198 ret_val = (values - np.mean(values)) / np.std(values) 199 return ret_val 200 201 def _get_features(self, features, mfcc_instance, audio_data): 202 idx = 0 203 while len(features) < self.model_input_size * mfcc_instance.mfcc_params.num_mfcc_feats: 204 current_frame_feats = mfcc_instance.mfcc_compute(audio_data[idx:idx + int(mfcc_instance.mfcc_params.frame_len)]) 205 features.extend(current_frame_feats) 206 idx += self.stride 207 208 def mfcc_delta_calc(self, features): 209 """ 210 Placeholder function over-ridden in child class 211 """ 212 return features 213 214 def extract_features(self, audio_data): 215 """ 216 Extracts the MFCC features. Also calculates each features first and second order derivatives 217 if the mfcc_delta_calc() function has been implemented by a child class. 218 The matrix returned should be sized appropriately for input to the model, based 219 on the model info specified in the MFCC instance. 220 221 Args: 222 audio_data: the audio data to be used for this calculation 223 Returns: 224 the derived MFCC feature vector, sized appropriately for inference 225 """ 226 227 num_samples_per_inference = ((self.model_input_size - 1) 228 * self.stride) + self._mfcc_calc.mfcc_params.frame_len 229 230 if len(audio_data) < num_samples_per_inference: 231 raise ValueError("audio_data size for feature extraction is smaller than " 232 "the expected number of samples needed for inference") 233 234 features = [] 235 self._get_features(features, self._mfcc_calc, np.asarray(audio_data)) 236 features = np.reshape(np.array(features), (self.model_input_size, self._mfcc_calc.mfcc_params.num_mfcc_feats)) 237 features = self.mfcc_delta_calc(features) 238 return np.float32(features) 239