xref: /btstack/test/sbc/sbc_encoder.py (revision 6ccd8248590f666db07dd7add13fecb4f5664fb5)
1#!/usr/bin/env python3
2import numpy as np
3import wave
4import struct
5import sys
6from sbc import *
7
8X = np.zeros(shape=(2,80), dtype = np.int16)
9implementation = "SIG"
10msbc_enabled = 0
11total_time_ms = 0
12
13def fetch_samples_for_next_sbc_frame(fin, frame):
14    raw_data = fin.readframes(frame.nr_blocks * frame.nr_subbands)
15    fmt = "%ih" % (len(raw_data) / 2)
16    data = struct.unpack(fmt, raw_data)
17
18    if frame.nr_channels == 2:
19        for i in range(len(data)/2):
20            frame.pcm[0][i] = data[2*i]
21            frame.pcm[1][i] = data[2*i+1]
22    else:
23        for i in range(len(data)):
24            frame.pcm[0][i] = data[i]
25
26
27def sbc_frame_analysis_sig(frame, ch, blk, C):
28    global X
29
30    M = frame.nr_subbands
31    L = 10 * M
32    M2 = 2*M
33    L2 = 2*L
34
35    Z = np.zeros(L)
36    Y = np.zeros(M2)
37    W = np.zeros(shape=(M, M2))
38    S = np.zeros(M)
39
40    for i in range(L-1, M-1, -1):
41        X[ch][i] = X[ch][i-M]
42    for i in range(M-1, -1, -1):
43        X[ch][i] = frame.EX[M-1-i]
44    for i in range(L):
45        Z[i] = X[ch][i] * C[i]
46
47    for i in range(M2):
48        for k in range(5):
49            Y[i] += Z[i+k*M2]
50
51    for i in range(M):
52        for k in range(M2):
53            W[i][k] = np.cos((i+0.5)*(k-M/2)*np.pi/M)
54            S[i] += W[i][k] * Y[k]
55
56    for sb in range(M):
57        frame.sb_sample[blk][ch][sb] = S[sb]
58
59
60def sbc_frame_analysis(frame, ch, blk, proto_table):
61    global total_time_ms, implementation
62
63    t1 = time_ms()
64    if implementation == "SIG":
65         sbc_frame_analysis_sig(frame, ch, blk, proto_table)
66    else:
67        print ("Analysis %s not implemented" % implementation)
68        exit(1)
69
70    t2 = time_ms()
71    total_time_ms += t2-t1
72
73
74def sbc_analysis(frame):
75    if frame.nr_subbands == 4:
76        C = Proto_4_40
77    elif frame.nr_subbands == 8:
78        C = Proto_8_80
79    else:
80        return -1
81
82    frame.sb_sample = np.ndarray(shape=(frame.nr_blocks, frame.nr_channels, frame.nr_subbands))
83    for ch in range(frame.nr_channels):
84        index = 0
85        for blk in range(frame.nr_blocks):
86            for sb in range(frame.nr_subbands):
87                frame.EX[sb] = frame.pcm[ch][index]
88                index+=1
89            sbc_frame_analysis(frame, ch, blk, C)
90    return 0
91
92def sbc_encode(frame, force_channel_mode):
93    err = sbc_analysis(frame)
94    if err >= 0:
95        err = sbc_quantization(frame, force_channel_mode)
96    return err
97
98def sbc_quantization(frame, force_channel_mode):
99    global msbc_enabled
100    calculate_channel_mode_and_scale_factors(frame, force_channel_mode)
101    frame.bits = sbc_bit_allocation(frame)
102
103    # Reconstruct the Audio Samples
104    frame.levels = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
105    for ch in range(frame.nr_channels):
106        for sb in range(frame.nr_subbands):
107            frame.levels[ch][sb] = (1 << frame.bits[ch][sb]) - 1 #pow(2.0, frame.bits[ch][sb]) - 1
108
109    if msbc_enabled:
110        frame.syncword = 0xad
111    else:
112        frame.syncword = 0x9c
113
114    frame.crc_check = calculate_crc(frame)
115
116
117    for blk in range(frame.nr_blocks):
118        for ch in range(frame.nr_channels):
119            for sb in range(frame.nr_subbands):
120                if frame.levels[ch][sb] > 0:
121                    SB = frame.sb_sample[blk][ch][sb]
122                    L  = frame.levels[ch][sb]
123                    SF = frame.scalefactor[ch][sb]
124                    frame.audio_sample[blk][ch][sb] = np.uint16(((SB * L / SF + L) - 1.0)/2.0)
125                else:
126                    frame.audio_sample[blk][ch][sb] = 0
127
128    return 0
129
130def sbc_write_frame(fout, sbc_encoder_frame):
131    stream = frame_to_bitstream(sbc_encoder_frame)
132    barray = bytearray(stream)
133    fout.write(barray)
134
135if __name__ == "__main__":
136    usage = '''
137    Usage:      ./sbc_encoder.py input.wav blocks subbands bitpool allocation_method[0-LOUDNESS,1-SNR] force_channel_mode[2-STEREO,3-JOINT_STEREO] [0-sbc|1-msbc]
138    Example:    ./sbc_encoder.py fanfare.wav 16 4 31 0 0
139    '''
140    nr_blocks = 0
141    nr_subbands = 0
142
143    if (len(sys.argv) < 7):
144        print(usage)
145        sys.exit(1)
146    try:
147        infile = sys.argv[1]
148        if not infile.endswith('.wav'):
149            print(usage)
150            sys.exit(1)
151
152        msbc_enabled = int(sys.argv[7])
153        print("msbc_enabled %d"%msbc_enabled)
154        if msbc_enabled:
155            sbcfile = infile.replace('.wav', '-encoded.msbc')
156        else:
157            sbcfile = infile.replace('.wav', '-encoded.sbc')
158
159        nr_blocks = int(sys.argv[2])
160        nr_subbands = int(sys.argv[3])
161        bitpool = int(sys.argv[4])
162        allocation_method = int(sys.argv[5])
163
164        force_channel_mode = int(sys.argv[6])
165        print("force_channel_mode %d"%force_channel_mode)
166
167
168        fin = wave.open(infile, 'rb')
169        nr_channels = fin.getnchannels()
170        if msbc_enabled:
171            sampling_frequency = 16000
172            nr_channels = 1
173            bitpool = 26
174            nr_subbands = 8
175            allocation_method = 0
176            force_channel_mode = 0
177        else:
178            sampling_frequency = fin.getframerate()
179            nr_channels = fin.getnchannels()
180        nr_audio_frames = fin.getnframes()
181
182        subband_frame_count = 0
183        audio_frame_count = 0
184        nr_samples = nr_blocks * nr_subbands
185        fout = open(sbcfile, 'wb')
186        while audio_frame_count < nr_audio_frames:
187            if subband_frame_count % 200 == 0:
188                print("== Frame %d == "  % (subband_frame_count))
189
190            sbc_encoder_frame = SBCFrame(nr_blocks, nr_subbands, nr_channels, bitpool, sampling_frequency, allocation_method, force_channel_mode)
191
192            if subband_frame_count == 0:
193                print (sbc_encoder_frame)
194            fetch_samples_for_next_sbc_frame(fin, sbc_encoder_frame)
195
196            sbc_encode(sbc_encoder_frame, force_channel_mode)
197            sbc_write_frame(fout, sbc_encoder_frame)
198
199            audio_frame_count += nr_samples
200            subband_frame_count += 1
201
202        fin.close()
203        fout.close()
204        print("DONE, WAV file %s encoded into SBC file %s " % (infile, sbcfile))
205        if subband_frame_count > 0:
206            print ("Average analysis time per frame: %d ms/frame" % (total_time_ms/subband_frame_count))
207        else:
208            print ("No frame found")
209
210
211    except IOError as e:
212        print(usage)
213        sys.exit(1)
214
215
216
217
218
219