1# 2# Copyright 2024 Google LLC 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import array 18import ctypes 19import enum 20import glob 21import os 22 23from ctypes import c_bool, c_byte, c_int, c_uint, c_size_t, c_void_p 24from ctypes.util import find_library 25 26 27class _Base: 28 29 def __init__(self, frame_duration, samplerate, nchannels, **kwargs): 30 31 self.hrmode = False 32 self.dt_us = int(frame_duration * 1000) 33 self.sr_hz = int(samplerate) 34 self.sr_pcm_hz = self.sr_hz 35 self.nchannels = nchannels 36 37 libpath = None 38 39 for k in kwargs.keys(): 40 if k == 'hrmode': 41 self.hrmode = bool(kwargs[k]) 42 elif k == 'pcm_samplerate': 43 self.sr_pcm_hz = int(kwargs[k]) 44 elif k == 'libpath': 45 libpath = kwargs[k] 46 else: 47 raise ValueError("Invalid keyword argument: " + k) 48 49 if self.dt_us not in [2500, 5000, 7500, 10000]: 50 raise ValueError( 51 "Invalid frame duration: %.1f ms" % frame_duration) 52 53 allowed_samplerate = [8000, 16000, 24000, 32000, 48000] \ 54 if not self.hrmode else [48000, 96000] 55 56 if self.sr_hz not in allowed_samplerate: 57 raise ValueError("Invalid sample rate: %d Hz" % samplerate) 58 59 if libpath is None: 60 mesonpy_lib = glob.glob(os.path.join(os.path.dirname(__file__), '.lc3.mesonpy.libs', '*lc3*')) 61 62 if mesonpy_lib: 63 libpath = mesonpy_lib[0] 64 else: 65 libpath = find_library("lc3") 66 if not libpath: 67 raise Exception("LC3 library not found") 68 69 lib = ctypes.cdll.LoadLibrary(libpath) 70 71 try: 72 lib.lc3_hr_frame_samples \ 73 and lib.lc3_hr_frame_block_bytes \ 74 and lib.lc3_hr_resolve_bitrate \ 75 and lib.lc3_hr_delay_samples 76 77 except AttributeError: 78 79 if self.hrmode: 80 raise Exception('High-Resolution interface not available') 81 82 lib.lc3_hr_frame_samples = \ 83 lambda hrmode, dt_us, sr_hz: \ 84 lib.lc3_frame_samples(dt_us, sr_hz) 85 86 lib.lc3_hr_frame_block_bytes = \ 87 lambda hrmode, dt_us, sr_hz, nchannels, bitrate: \ 88 nchannels * lib.lc3_frame_bytes(dt_us, bitrate // 2) 89 90 lib.lc3_hr_resolve_bitrate = \ 91 lambda hrmode, dt_us, sr_hz, nbytes: \ 92 lib.lc3_resolve_bitrate(dt_us, nbytes) 93 94 lib.lc3_hr_delay_samples = \ 95 lambda hrmode, dt_us, sr_hz: \ 96 lib.lc3_delay_samples(dt_us, sr_hz) 97 98 lib.lc3_hr_frame_samples.argtypes = [c_bool, c_int, c_int] 99 lib.lc3_hr_frame_block_bytes.argtypes = \ 100 [c_bool, c_int, c_int, c_int, c_int] 101 lib.lc3_hr_resolve_bitrate.argtypes = [c_bool, c_int, c_int, c_int] 102 lib.lc3_hr_delay_samples.argtypes = [c_bool, c_int, c_int] 103 self.lib = lib 104 105 libc = ctypes.cdll.LoadLibrary(find_library("c")) 106 107 self.malloc = libc.malloc 108 self.malloc.argtypes = [c_size_t] 109 self.malloc.restype = c_void_p 110 111 self.free = libc.free 112 self.free.argtypes = [c_void_p] 113 114 def get_frame_samples(self): 115 """ 116 Returns the number of PCM samples in an LC3 frame 117 """ 118 ret = self.lib.lc3_hr_frame_samples( 119 self.hrmode, self.dt_us, self.sr_pcm_hz) 120 if ret < 0: 121 raise ValueError("Bad parameters") 122 return ret 123 124 def get_frame_bytes(self, bitrate): 125 """ 126 Returns the size of LC3 frame blocks, from bitrate in bit per seconds. 127 A target `bitrate` equals 0 or `INT32_MAX` returns respectively 128 the minimum and maximum allowed size. 129 """ 130 ret = self.lib.lc3_hr_frame_block_bytes( 131 self.hrmode, self.dt_us, self.sr_hz, self.nchannels, bitrate) 132 if ret < 0: 133 raise ValueError("Bad parameters") 134 return ret 135 136 def resolve_bitrate(self, nbytes): 137 """ 138 Returns the bitrate in bits per seconds, from the size of LC3 frames. 139 """ 140 ret = self.lib.lc3_hr_resolve_bitrate( 141 self.hrmode, self.dt_us, self.sr_hz, nbytes) 142 if ret < 0: 143 raise ValueError("Bad parameters") 144 return ret 145 146 def get_delay_samples(self): 147 """ 148 Returns the algorithmic delay, as a number of samples. 149 """ 150 ret = self.lib.lc3_hr_delay_samples( 151 self.hrmode, self.dt_us, self.sr_pcm_hz) 152 if ret < 0: 153 raise ValueError("Bad parameters") 154 return ret 155 156 @staticmethod 157 def _resolve_pcm_format(bitdepth): 158 PCM_FORMAT_S16 = 0 159 PCM_FORMAT_S24 = 1 160 PCM_FORMAT_S24_3LE = 2 161 PCM_FORMAT_FLOAT = 3 162 163 match bitdepth: 164 case 16: return (PCM_FORMAT_S16, ctypes.c_int16) 165 case 24: return (PCM_FORMAT_S24_3LE, 3 * ctypes.c_byte) 166 case None: return (PCM_FORMAT_FLOAT, ctypes.c_float) 167 case _: raise ValueError("Could not interpret PCM bitdepth") 168 169 170class Encoder(_Base): 171 """ 172 LC3 Encoder wrapper 173 174 The `frame_duration` expressed in milliseconds is any of 2.5, 5.0, 7.5 175 or 10.0. The `samplerate`, in Hertz, is any of 8000, 16000, 24000, 32000 176 or 48000, unless High-Resolution mode is enabled. In High-Resolution mode, 177 the `samplerate` is 48000 or 96000. 178 179 By default, one channel is processed. When `nchannels` is greater than one, 180 the PCM input stream is read interleaved and consecutives LC3 frames are 181 output, for each channel. 182 183 Keyword arguments: 184 hrmode : Enable High-Resolution mode, default is `False`. 185 sr_pcm_hz : Input PCM samplerate, enable downsampling of input. 186 libpath : LC3 library path and name 187 """ 188 189 class c_encoder_t(c_void_p): 190 pass 191 192 def __init__(self, frame_duration, samplerate, nchannels=1, **kwargs): 193 194 super().__init__(frame_duration, samplerate, nchannels, **kwargs) 195 196 lib = self.lib 197 198 try: 199 lib.lc3_hr_encoder_size \ 200 and lib.lc3_hr_setup_encoder 201 202 except AttributeError: 203 204 assert not self.hrmode 205 206 lib.lc3_hr_encoder_size = \ 207 lambda hrmode, dt_us, sr_hz: \ 208 lib.lc3_encoder_size(dt_us, sr_hz) 209 210 lib.lc3_hr_setup_encoder = \ 211 lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: \ 212 lib.lc3_setup_encoder(dt_us, sr_hz, sr_pcm_hz, mem) 213 214 lib.lc3_hr_encoder_size.argtypes = [c_bool, c_int, c_int] 215 lib.lc3_hr_encoder_size.restype = c_uint 216 217 lib.lc3_hr_setup_encoder.argtypes = \ 218 [c_bool, c_int, c_int, c_int, c_void_p] 219 lib.lc3_hr_setup_encoder.restype = self.c_encoder_t 220 221 lib.lc3_encode.argtypes = \ 222 [self.c_encoder_t, c_int, c_void_p, c_int, c_int, c_void_p] 223 224 def new_encoder(): return lib.lc3_hr_setup_encoder( 225 self.hrmode, self.dt_us, self.sr_hz, self.sr_pcm_hz, 226 self.malloc(lib.lc3_hr_encoder_size( 227 self.hrmode, self.dt_us, self.sr_pcm_hz))) 228 229 self.__encoders = [new_encoder() for i in range(nchannels)] 230 231 def __del__(self): 232 233 try: 234 (self.free(encoder) for encoder in self.__encoders) 235 finally: 236 return 237 238 def encode(self, pcm, nbytes, bitdepth=None): 239 """ 240 Encode LC3 frame(s), for each channel. 241 242 The `pcm` input is given in two ways. When no `bitdepth` is defined, 243 it's a vector of floating point values from -1 to 1, coding the sample 244 levels. When `bitdepth` is defined, `pcm` is interpreted as a byte-like 245 object, each sample coded on `bitdepth` bits (16 or 24). 246 The machine endianness, or little endian, is used for 16 or 24 bits 247 width, respectively. 248 In both cases, the `pcm` vector data is padded with zeros when 249 its length is less than the required input samples for the encoder. 250 Channels concatenation of encoded LC3 frames, of `nbytes`, is returned. 251 """ 252 253 nchannels = self.nchannels 254 frame_samples = self.get_frame_samples() 255 256 (pcm_fmt, pcm_t) = self._resolve_pcm_format(bitdepth) 257 pcm_len = nchannels * frame_samples 258 259 if bitdepth is None: 260 pcm_buffer = array.array('f', pcm) 261 262 # Invert test to catch NaN 263 if not abs(sum(pcm)) / frame_samples < 2: 264 raise ValueError("Out of range PCM input") 265 266 padding = max(pcm_len - frame_samples, 0) 267 pcm_buffer.extend(array.array('f', [0] * padding)) 268 269 else: 270 padding = max(pcm_len * ctypes.sizeof(pcm_t) - len(pcm), 0) 271 pcm_buffer = bytearray(pcm) + bytearray(padding) 272 273 data_buffer = (c_byte * nbytes)() 274 data_offset = 0 275 276 for (ich, encoder) in enumerate(self.__encoders): 277 278 pcm_offset = ich * ctypes.sizeof(pcm_t) 279 pcm = (pcm_t * (pcm_len - ich)).from_buffer(pcm_buffer, pcm_offset) 280 281 data_size = nbytes // nchannels + int(ich < nbytes % nchannels) 282 data = (c_byte * data_size).from_buffer(data_buffer, data_offset) 283 data_offset += data_size 284 285 ret = self.lib.lc3_encode( 286 encoder, pcm_fmt, pcm, nchannels, len(data), data) 287 if ret < 0: 288 raise ValueError("Bad parameters") 289 290 return bytes(data_buffer) 291 292 293class Decoder(_Base): 294 """ 295 LC3 Decoder wrapper 296 297 The `frame_duration` expressed in milliseconds is any of 2.5, 5.0, 7.5 298 or 10.0. The `samplerate`, in Hertz, is any of 8000, 16000, 24000, 32000 299 or 48000, unless High-Resolution mode is enabled. In High-Resolution 300 mode, the `samplerate` is 48000 or 96000. 301 302 By default, one channel is processed. When `nchannels` is greater than one, 303 the PCM input stream is read interleaved and consecutives LC3 frames are 304 output, for each channel. 305 306 Keyword arguments: 307 hrmode : Enable High-Resolution mode, default is `False`. 308 sr_pcm_hz : Output PCM samplerate, enable upsampling of output. 309 libpath : LC3 library path and name 310 """ 311 312 class c_decoder_t(c_void_p): 313 pass 314 315 def __init__(self, frame_duration, samplerate, nchannels=1, **kwargs): 316 317 super().__init__(frame_duration, samplerate, nchannels, **kwargs) 318 319 lib = self.lib 320 321 try: 322 lib.lc3_hr_decoder_size \ 323 and lib.lc3_hr_setup_decoder 324 325 except AttributeError: 326 327 assert not self.hrmode 328 329 lib.lc3_hr_decoder_size = \ 330 lambda hrmode, dt_us, sr_hz: \ 331 lib.lc3_decoder_size(dt_us, sr_hz) 332 333 lib.lc3_hr_setup_decoder = \ 334 lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: \ 335 lib.lc3_setup_decoder(dt_us, sr_hz, sr_pcm_hz, mem) 336 337 lib.lc3_hr_decoder_size.argtypes = [c_bool, c_int, c_int] 338 lib.lc3_hr_decoder_size.restype = c_uint 339 340 lib.lc3_hr_setup_decoder.argtypes = \ 341 [c_bool, c_int, c_int, c_int, c_void_p] 342 lib.lc3_hr_setup_decoder.restype = self.c_decoder_t 343 344 lib.lc3_decode.argtypes = \ 345 [self.c_decoder_t, c_void_p, c_int, c_int, c_void_p, c_int] 346 347 def new_decoder(): return lib.lc3_hr_setup_decoder( 348 self.hrmode, self.dt_us, self.sr_hz, self.sr_pcm_hz, 349 self.malloc(lib.lc3_hr_decoder_size( 350 self.hrmode, self.dt_us, self.sr_pcm_hz))) 351 352 self.__decoders = [new_decoder() for i in range(nchannels)] 353 354 def __del__(self): 355 356 try: 357 (self.free(decoder) for decoder in self.__decoders) 358 finally: 359 return 360 361 def decode(self, data, bitdepth=None): 362 """ 363 Decode an LC3 frame 364 365 The input `data` is the channels concatenation of LC3 frames in a 366 byte-like object. Interleaved PCM samples are returned according to 367 the `bitdepth` indication. 368 When no `bitdepth` is defined, it's a vector of floating point values 369 from -1 to 1, coding the sample levels. When `bitdepth` is defined, 370 it returns a byte array, each sample coded on `bitdepth` bits. 371 The machine endianness, or little endian, is used for 16 or 24 bits 372 width, respectively. 373 """ 374 375 nchannels = self.nchannels 376 frame_samples = self.get_frame_samples() 377 378 (pcm_fmt, pcm_t) = self._resolve_pcm_format(bitdepth) 379 pcm_len = nchannels * self.get_frame_samples() 380 pcm_buffer = (pcm_t * pcm_len)() 381 382 data_buffer = bytearray(data) 383 data_offset = 0 384 385 for (ich, decoder) in enumerate(self.__decoders): 386 pcm_offset = ich * ctypes.sizeof(pcm_t) 387 pcm = (pcm_t * (pcm_len - ich)).from_buffer(pcm_buffer, pcm_offset) 388 389 data_size = len(data_buffer) // nchannels + \ 390 int(ich < len(data_buffer) % nchannels) 391 data = (c_byte * data_size).from_buffer(data_buffer, data_offset) 392 data_offset += data_size 393 394 ret = self.lib.lc3_decode( 395 decoder, data, len(data), pcm_fmt, pcm, self.nchannels) 396 if ret < 0: 397 raise ValueError("Bad parameters") 398 399 return array.array('f', pcm_buffer) \ 400 if bitdepth is None else bytes(pcm_buffer) 401