xref: /btstack/test/sbc/sbc_encoder.py (revision ef8a7a12f41cfc681f8f5d2d0890863f62c52a4e)
1#!/usr/bin/env python
2import numpy as np
3import wave
4import struct
5import sys
6from sbc import *
7
8X = np.zeros(shape=(2,80), dtype = np.int16)
9
10def fetch_samples_for_next_sbc_frame(fin, frame):
11    raw_data = fin.readframes(frame.nr_blocks * frame.nr_subbands)
12    fmt = "%ih" % (len(raw_data) / 2)
13    data = struct.unpack(fmt, raw_data)
14
15    if frame.nr_channels == 2:
16        for i in range(len(data)/2):
17            frame.pcm[0][i] = data[2*i]
18            frame.pcm[1][i] = data[2*i+1]
19    else:
20        for i in range(len(data)):
21            frame.pcm[0][i] = data[i]
22
23
24def sbc_frame_analysis(frame, ch, blk, C):
25    global X
26
27    M = frame.nr_subbands
28    L = 10 * M
29    M2 = 2*M
30    L2 = 2*L
31
32    Z = np.zeros(L)
33    Y = np.zeros(M2)
34    W = np.zeros(shape=(M, M2))
35    S = np.zeros(M)
36
37    for i in range(L-1, M-1, -1):
38        X[ch][i] = X[ch][i-M]
39    for i in range(M-1, -1, -1):
40        X[ch][i] = frame.EX[M-1-i]
41    for i in range(L):
42        Z[i] = X[ch][i] * C[i]
43
44    for i in range(M2):
45        for k in range(5):
46            Y[i] += Z[i+k*M2]
47
48    for i in range(M):
49        for k in range(M2):
50            W[i][k] = np.cos((i+0.5)*(k-M/2)*np.pi/M)
51            S[i] += W[i][k] * Y[k]
52
53    for sb in range(M):
54        frame.sb_sample[blk][ch][sb] = S[sb]
55
56def sbc_analysis(frame):
57    if frame.nr_subbands == 4:
58        C = Proto_4_40
59    elif frame.nr_subbands == 8:
60        C = Proto_8_80
61    else:
62        return -1
63
64    frame.sb_sample = np.ndarray(shape=(frame.nr_blocks, frame.nr_channels, frame.nr_subbands))
65    for ch in range(frame.nr_channels):
66        index = 0
67        for blk in range(frame.nr_blocks):
68            for sb in range(frame.nr_subbands):
69                frame.EX[sb] = frame.pcm[ch][index]
70                index+=1
71            sbc_frame_analysis(frame, ch, blk, C)
72    return 0
73
74def sbc_encode(frame, force_channel_mode):
75    err = sbc_analysis(frame)
76    if err >= 0:
77        err = sbc_quantization(frame, force_channel_mode)
78    return err
79
80def sbc_quantization(frame, force_channel_mode):
81    calculate_channel_mode_and_scale_factors(frame, force_channel_mode)
82    frame.bits = sbc_bit_allocation(frame)
83
84    # Reconstruct the Audio Samples
85    frame.levels = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
86    for ch in range(frame.nr_channels):
87        for sb in range(frame.nr_subbands):
88            frame.levels[ch][sb] = (1 << frame.bits[ch][sb]) - 1 #pow(2.0, frame.bits[ch][sb]) - 1
89
90    frame.syncword = 0x9c
91    frame.crc_check = calculate_crc(frame)
92
93
94    for blk in range(frame.nr_blocks):
95        for ch in range(frame.nr_channels):
96            for sb in range(frame.nr_subbands):
97                if frame.levels[ch][sb] > 0:
98                    SB = frame.sb_sample[blk][ch][sb]
99                    L  = frame.levels[ch][sb]
100                    SF = frame.scalefactor[ch][sb]
101                    frame.audio_sample[blk][ch][sb] = np.uint16(((SB * L / SF + L) - 1.0)/2.0)
102                else:
103                    frame.audio_sample[blk][ch][sb] = 0
104
105    return 0
106
107def sbc_write_frame(fout, sbc_encoder_frame):
108    stream = frame_to_bitstream(sbc_encoder_frame)
109    barray = bytearray(stream)
110    fout.write(barray)
111
112if __name__ == "__main__":
113    usage = '''
114    Usage:      ./sbc_encoder.py input.wav blocks subbands bitpool allocation_method[0-LOUDNESS,1-SNR] force_channel_mode[2-STEREO,3-JOINT_STEREO]
115    Example:    ./sbc_encoder.py fanfare.wav 16 4 31 0
116    '''
117    nr_blocks = 0
118    nr_subbands = 0
119
120
121    if (len(sys.argv) < 6):
122        print(usage)
123        sys.exit(1)
124    try:
125        infile = sys.argv[1]
126        if not infile.endswith('.wav'):
127            print(usage)
128            sys.exit(1)
129        sbcfile = infile.replace('.wav', '-encoded.sbc')
130
131        nr_blocks = int(sys.argv[2])
132        nr_subbands = int(sys.argv[3])
133        bitpool = int(sys.argv[4])
134        allocation_method = int(sys.argv[5])
135        force_channel_mode = 0
136        if len(sys.argv) == 6:
137            force_channel_mode = int(sys.argv[6])
138            if force_channel_mode != 2 or force_channel_mode != 3:
139                print(usage)
140                sys.exit(1)
141
142        fin = wave.open(infile, 'rb')
143        nr_channels = fin.getnchannels()
144        sampling_frequency = fin.getframerate()
145        nr_audio_frames = fin.getnframes()
146
147        subband_frame_count = 0
148        audio_frame_count = 0
149        nr_samples = nr_blocks * nr_subbands
150        fout = open(sbcfile, 'wb')
151        while audio_frame_count < nr_audio_frames:
152            if subband_frame_count % 200 == 0:
153                print("== Frame %d ==" % (subband_frame_count))
154
155            sbc_encoder_frame = SBCFrame(nr_blocks, nr_subbands, nr_channels, bitpool, sampling_frequency, allocation_method)
156            fetch_samples_for_next_sbc_frame(fin, sbc_encoder_frame)
157
158            sbc_encode(sbc_encoder_frame, force_channel_mode)
159            sbc_write_frame(fout, sbc_encoder_frame)
160
161            audio_frame_count += nr_samples
162            subband_frame_count += 1
163
164        fin.close()
165        fout.close()
166        print("DONE, WAV file %s encoded into SBC file %s " % (infile, sbcfile))
167
168
169    except IOError as e:
170        print(usage)
171        sys.exit(1)
172
173
174
175
176
177