xref: /btstack/3rd-party/lc3-google/python/lc3.py (revision 25d5427a345779b159b63c0d8b197d2dee40cf37)
1#
2# Copyright 2024 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import array
18import ctypes
19import enum
20import glob
21import os
22
23from ctypes import c_bool, c_byte, c_int, c_uint, c_size_t, c_void_p
24from ctypes.util import find_library
25
26
27class _Base:
28
29    def __init__(self, frame_duration, samplerate, nchannels, **kwargs):
30
31        self.hrmode = False
32        self.dt_us = int(frame_duration * 1000)
33        self.sr_hz = int(samplerate)
34        self.sr_pcm_hz = self.sr_hz
35        self.nchannels = nchannels
36
37        libpath = None
38
39        for k in kwargs.keys():
40            if k == 'hrmode':
41                self.hrmode = bool(kwargs[k])
42            elif k == 'pcm_samplerate':
43                self.sr_pcm_hz = int(kwargs[k])
44            elif k == 'libpath':
45                libpath = kwargs[k]
46            else:
47                raise ValueError("Invalid keyword argument: " + k)
48
49        if self.dt_us not in [2500, 5000, 7500, 10000]:
50            raise ValueError(
51                "Invalid frame duration: %.1f ms" % frame_duration)
52
53        allowed_samplerate = [8000, 16000, 24000, 32000, 48000] \
54            if not self.hrmode else [48000, 96000]
55
56        if self.sr_hz not in allowed_samplerate:
57            raise ValueError("Invalid sample rate: %d Hz" % samplerate)
58
59        if libpath is None:
60            mesonpy_lib = glob.glob(os.path.join(os.path.dirname(__file__), '.lc3.mesonpy.libs', '*lc3*'))
61
62            if mesonpy_lib:
63                libpath = mesonpy_lib[0]
64            else:
65                libpath = find_library("lc3")
66            if not libpath:
67                raise Exception("LC3 library not found")
68
69        lib = ctypes.cdll.LoadLibrary(libpath)
70
71        try:
72            lib.lc3_hr_frame_samples \
73                and lib.lc3_hr_frame_block_bytes \
74                and lib.lc3_hr_resolve_bitrate \
75                and lib.lc3_hr_delay_samples
76
77        except AttributeError:
78
79            if self.hrmode:
80                raise Exception('High-Resolution interface not available')
81
82            lib.lc3_hr_frame_samples = \
83                lambda hrmode, dt_us, sr_hz: \
84                lib.lc3_frame_samples(dt_us, sr_hz)
85
86            lib.lc3_hr_frame_block_bytes = \
87                lambda hrmode, dt_us, sr_hz, nchannels, bitrate: \
88                nchannels * lib.lc3_frame_bytes(dt_us, bitrate // 2)
89
90            lib.lc3_hr_resolve_bitrate = \
91                lambda hrmode, dt_us, sr_hz, nbytes: \
92                lib.lc3_resolve_bitrate(dt_us, nbytes)
93
94            lib.lc3_hr_delay_samples = \
95                lambda hrmode, dt_us, sr_hz: \
96                lib.lc3_delay_samples(dt_us, sr_hz)
97
98        lib.lc3_hr_frame_samples.argtypes = [c_bool, c_int, c_int]
99        lib.lc3_hr_frame_block_bytes.argtypes = \
100            [c_bool, c_int, c_int, c_int, c_int]
101        lib.lc3_hr_resolve_bitrate.argtypes = [c_bool, c_int, c_int, c_int]
102        lib.lc3_hr_delay_samples.argtypes = [c_bool, c_int, c_int]
103        self.lib = lib
104
105        libc = ctypes.cdll.LoadLibrary(find_library("c"))
106
107        self.malloc = libc.malloc
108        self.malloc.argtypes = [c_size_t]
109        self.malloc.restype = c_void_p
110
111        self.free = libc.free
112        self.free.argtypes = [c_void_p]
113
114    def get_frame_samples(self):
115        """
116        Returns the number of PCM samples in an LC3 frame
117        """
118        ret = self.lib.lc3_hr_frame_samples(
119            self.hrmode, self.dt_us, self.sr_pcm_hz)
120        if ret < 0:
121            raise ValueError("Bad parameters")
122        return ret
123
124    def get_frame_bytes(self, bitrate):
125        """
126        Returns the size of LC3 frame blocks, from bitrate in bit per seconds.
127        A target `bitrate` equals 0 or `INT32_MAX` returns respectively
128        the minimum and maximum allowed size.
129        """
130        ret = self.lib.lc3_hr_frame_block_bytes(
131            self.hrmode, self.dt_us, self.sr_hz, self.nchannels, bitrate)
132        if ret < 0:
133            raise ValueError("Bad parameters")
134        return ret
135
136    def resolve_bitrate(self, nbytes):
137        """
138        Returns the bitrate in bits per seconds, from the size of LC3 frames.
139        """
140        ret = self.lib.lc3_hr_resolve_bitrate(
141            self.hrmode, self.dt_us, self.sr_hz, nbytes)
142        if ret < 0:
143            raise ValueError("Bad parameters")
144        return ret
145
146    def get_delay_samples(self):
147        """
148         Returns the algorithmic delay, as a number of samples.
149         """
150        ret = self.lib.lc3_hr_delay_samples(
151            self.hrmode, self.dt_us, self.sr_pcm_hz)
152        if ret < 0:
153            raise ValueError("Bad parameters")
154        return ret
155
156    @staticmethod
157    def _resolve_pcm_format(bitdepth):
158        PCM_FORMAT_S16 = 0
159        PCM_FORMAT_S24 = 1
160        PCM_FORMAT_S24_3LE = 2
161        PCM_FORMAT_FLOAT = 3
162
163        match bitdepth:
164            case 16: return (PCM_FORMAT_S16, ctypes.c_int16)
165            case 24: return (PCM_FORMAT_S24_3LE, 3 * ctypes.c_byte)
166            case None: return (PCM_FORMAT_FLOAT, ctypes.c_float)
167            case _: raise ValueError("Could not interpret PCM bitdepth")
168
169
170class Encoder(_Base):
171    """
172    LC3 Encoder wrapper
173
174    The `frame_duration` expressed in milliseconds is any of 2.5, 5.0, 7.5
175    or 10.0. The `samplerate`, in Hertz, is any of 8000, 16000, 24000, 32000
176    or 48000, unless High-Resolution mode is enabled. In High-Resolution mode,
177    the `samplerate` is 48000 or 96000.
178
179    By default, one channel is processed. When `nchannels` is greater than one,
180    the PCM input stream is read interleaved and consecutives LC3 frames are
181    output, for each channel.
182
183    Keyword arguments:
184        hrmode    : Enable High-Resolution mode, default is `False`.
185        sr_pcm_hz : Input PCM samplerate, enable downsampling of input.
186        libpath   : LC3 library path and name
187    """
188
189    class c_encoder_t(c_void_p):
190        pass
191
192    def __init__(self, frame_duration, samplerate, nchannels=1, **kwargs):
193
194        super().__init__(frame_duration, samplerate, nchannels, **kwargs)
195
196        lib = self.lib
197
198        try:
199            lib.lc3_hr_encoder_size \
200                and lib.lc3_hr_setup_encoder
201
202        except AttributeError:
203
204            assert not self.hrmode
205
206            lib.lc3_hr_encoder_size = \
207                lambda hrmode, dt_us, sr_hz: \
208                lib.lc3_encoder_size(dt_us, sr_hz)
209
210            lib.lc3_hr_setup_encoder = \
211                lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: \
212                lib.lc3_setup_encoder(dt_us, sr_hz, sr_pcm_hz, mem)
213
214        lib.lc3_hr_encoder_size.argtypes = [c_bool, c_int, c_int]
215        lib.lc3_hr_encoder_size.restype = c_uint
216
217        lib.lc3_hr_setup_encoder.argtypes = \
218            [c_bool, c_int, c_int, c_int, c_void_p]
219        lib.lc3_hr_setup_encoder.restype = self.c_encoder_t
220
221        lib.lc3_encode.argtypes = \
222            [self.c_encoder_t, c_int, c_void_p, c_int, c_int, c_void_p]
223
224        def new_encoder(): return lib.lc3_hr_setup_encoder(
225            self.hrmode, self.dt_us, self.sr_hz, self.sr_pcm_hz,
226            self.malloc(lib.lc3_hr_encoder_size(
227                self.hrmode, self.dt_us, self.sr_pcm_hz)))
228
229        self.__encoders = [new_encoder() for i in range(nchannels)]
230
231    def __del__(self):
232
233        try:
234            (self.free(encoder) for encoder in self.__encoders)
235        finally:
236            return
237
238    def encode(self, pcm, nbytes, bitdepth=None):
239        """
240        Encode LC3 frame(s), for each channel.
241
242        The `pcm` input is given in two ways. When no `bitdepth` is defined,
243        it's a vector of floating point values from -1 to 1, coding the sample
244        levels. When `bitdepth` is defined, `pcm` is interpreted as a byte-like
245        object, each sample coded on `bitdepth` bits (16 or 24).
246        The machine endianness, or little endian, is used for 16 or 24 bits
247        width, respectively.
248        In both cases, the `pcm` vector data is padded with zeros when
249        its length is less than the required input samples for the encoder.
250        Channels concatenation of encoded LC3 frames, of `nbytes`, is returned.
251        """
252
253        nchannels = self.nchannels
254        frame_samples = self.get_frame_samples()
255
256        (pcm_fmt, pcm_t) = self._resolve_pcm_format(bitdepth)
257        pcm_len = nchannels * frame_samples
258
259        if bitdepth is None:
260            pcm_buffer = array.array('f', pcm)
261
262            # Invert test to catch NaN
263            if not abs(sum(pcm)) / frame_samples < 2:
264                raise ValueError("Out of range PCM input")
265
266            padding = max(pcm_len - frame_samples, 0)
267            pcm_buffer.extend(array.array('f', [0] * padding))
268
269        else:
270            padding = max(pcm_len * ctypes.sizeof(pcm_t) - len(pcm), 0)
271            pcm_buffer = bytearray(pcm) + bytearray(padding)
272
273        data_buffer = (c_byte * nbytes)()
274        data_offset = 0
275
276        for (ich, encoder) in enumerate(self.__encoders):
277
278            pcm_offset = ich * ctypes.sizeof(pcm_t)
279            pcm = (pcm_t * (pcm_len - ich)).from_buffer(pcm_buffer, pcm_offset)
280
281            data_size = nbytes // nchannels + int(ich < nbytes % nchannels)
282            data = (c_byte * data_size).from_buffer(data_buffer, data_offset)
283            data_offset += data_size
284
285            ret = self.lib.lc3_encode(
286                encoder, pcm_fmt, pcm, nchannels, len(data), data)
287            if ret < 0:
288                raise ValueError("Bad parameters")
289
290        return bytes(data_buffer)
291
292
293class Decoder(_Base):
294    """
295    LC3 Decoder wrapper
296
297    The `frame_duration` expressed in milliseconds is any of 2.5, 5.0, 7.5
298    or 10.0. The `samplerate`, in Hertz, is any of 8000, 16000, 24000, 32000
299    or 48000, unless High-Resolution mode is enabled. In High-Resolution
300    mode, the `samplerate` is 48000 or 96000.
301
302    By default, one channel is processed. When `nchannels` is greater than one,
303    the PCM input stream is read interleaved and consecutives LC3 frames are
304    output, for each channel.
305
306    Keyword arguments:
307        hrmode    : Enable High-Resolution mode, default is `False`.
308        sr_pcm_hz : Output PCM samplerate, enable upsampling of output.
309        libpath   : LC3 library path and name
310    """
311
312    class c_decoder_t(c_void_p):
313        pass
314
315    def __init__(self, frame_duration, samplerate, nchannels=1, **kwargs):
316
317        super().__init__(frame_duration, samplerate, nchannels, **kwargs)
318
319        lib = self.lib
320
321        try:
322            lib.lc3_hr_decoder_size \
323                and lib.lc3_hr_setup_decoder
324
325        except AttributeError:
326
327            assert not self.hrmode
328
329            lib.lc3_hr_decoder_size = \
330                lambda hrmode, dt_us, sr_hz: \
331                lib.lc3_decoder_size(dt_us, sr_hz)
332
333            lib.lc3_hr_setup_decoder = \
334                lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: \
335                lib.lc3_setup_decoder(dt_us, sr_hz, sr_pcm_hz, mem)
336
337        lib.lc3_hr_decoder_size.argtypes = [c_bool, c_int, c_int]
338        lib.lc3_hr_decoder_size.restype = c_uint
339
340        lib.lc3_hr_setup_decoder.argtypes = \
341            [c_bool, c_int, c_int, c_int, c_void_p]
342        lib.lc3_hr_setup_decoder.restype = self.c_decoder_t
343
344        lib.lc3_decode.argtypes = \
345            [self.c_decoder_t, c_void_p, c_int, c_int, c_void_p, c_int]
346
347        def new_decoder(): return lib.lc3_hr_setup_decoder(
348            self.hrmode, self.dt_us, self.sr_hz, self.sr_pcm_hz,
349            self.malloc(lib.lc3_hr_decoder_size(
350                self.hrmode, self.dt_us, self.sr_pcm_hz)))
351
352        self.__decoders = [new_decoder() for i in range(nchannels)]
353
354    def __del__(self):
355
356        try:
357            (self.free(decoder) for decoder in self.__decoders)
358        finally:
359            return
360
361    def decode(self, data, bitdepth=None):
362        """
363        Decode an LC3 frame
364
365        The input `data` is the channels concatenation of LC3 frames in a
366        byte-like object. Interleaved PCM samples are returned according to
367        the `bitdepth` indication.
368        When no `bitdepth` is defined, it's a vector of floating point values
369        from -1 to 1, coding the sample levels. When `bitdepth` is defined,
370        it returns a byte array, each sample coded on `bitdepth` bits.
371        The machine endianness, or little endian, is used for 16 or 24 bits
372        width, respectively.
373        """
374
375        nchannels = self.nchannels
376        frame_samples = self.get_frame_samples()
377
378        (pcm_fmt, pcm_t) = self._resolve_pcm_format(bitdepth)
379        pcm_len = nchannels * self.get_frame_samples()
380        pcm_buffer = (pcm_t * pcm_len)()
381
382        data_buffer = bytearray(data)
383        data_offset = 0
384
385        for (ich, decoder) in enumerate(self.__decoders):
386            pcm_offset = ich * ctypes.sizeof(pcm_t)
387            pcm = (pcm_t * (pcm_len - ich)).from_buffer(pcm_buffer, pcm_offset)
388
389            data_size = len(data_buffer) // nchannels + \
390                int(ich < len(data_buffer) % nchannels)
391            data = (c_byte * data_size).from_buffer(data_buffer, data_offset)
392            data_offset += data_size
393
394            ret = self.lib.lc3_decode(
395                decoder, data, len(data), pcm_fmt, pcm, self.nchannels)
396            if ret < 0:
397                raise ValueError("Bad parameters")
398
399        return array.array('f', pcm_buffer) \
400            if bitdepth is None else bytes(pcm_buffer)
401