1"""Stuff to parse Sun and NeXT audio files.
2
3An audio file consists of a header followed by the data.  The structure
4of the header is as follows.
5
6        +---------------+
7        | magic word    |
8        +---------------+
9        | header size   |
10        +---------------+
11        | data size     |
12        +---------------+
13        | encoding      |
14        +---------------+
15        | sample rate   |
16        +---------------+
17        | # of channels |
18        +---------------+
19        | info          |
20        |               |
21        +---------------+
22
23The magic word consists of the 4 characters '.snd'.  Apart from the
24info field, all header fields are 4 bytes in size.  They are all
2532-bit unsigned integers encoded in big-endian byte order.
26
27The header size really gives the start of the data.
28The data size is the physical size of the data.  From the other
29parameters the number of frames can be calculated.
30The encoding gives the way in which audio samples are encoded.
31Possible values are listed below.
32The info field currently consists of an ASCII string giving a
33human-readable description of the audio file.  The info field is
34padded with NUL bytes to the header size.
35
36Usage.
37
38Reading audio files:
39        f = sunau.open(file, 'r')
40where file is either the name of a file or an open file pointer.
41The open file pointer must have methods read(), seek(), and close().
42When the setpos() and rewind() methods are not used, the seek()
43method is not  necessary.
44
45This returns an instance of a class with the following public methods:
46        getnchannels()  -- returns number of audio channels (1 for
47                           mono, 2 for stereo)
48        getsampwidth()  -- returns sample width in bytes
49        getframerate()  -- returns sampling frequency
50        getnframes()    -- returns number of audio frames
51        getcomptype()   -- returns compression type ('NONE' or 'ULAW')
52        getcompname()   -- returns human-readable version of
53                           compression type ('not compressed' matches 'NONE')
54        getparams()     -- returns a namedtuple consisting of all of the
55                           above in the above order
56        getmarkers()    -- returns None (for compatibility with the
57                           aifc module)
58        getmark(id)     -- raises an error since the mark does not
59                           exist (for compatibility with the aifc module)
60        readframes(n)   -- returns at most n frames of audio
61        rewind()        -- rewind to the beginning of the audio stream
62        setpos(pos)     -- seek to the specified position
63        tell()          -- return the current position
64        close()         -- close the instance (make it unusable)
65The position returned by tell() and the position given to setpos()
66are compatible and have nothing to do with the actual position in the
67file.
68The close() method is called automatically when the class instance
69is destroyed.
70
71Writing audio files:
72        f = sunau.open(file, 'w')
73where file is either the name of a file or an open file pointer.
74The open file pointer must have methods write(), tell(), seek(), and
75close().
76
77This returns an instance of a class with the following public methods:
78        setnchannels(n) -- set the number of channels
79        setsampwidth(n) -- set the sample width
80        setframerate(n) -- set the frame rate
81        setnframes(n)   -- set the number of frames
82        setcomptype(type, name)
83                        -- set the compression type and the
84                           human-readable compression type
85        setparams(tuple)-- set all parameters at once
86        tell()          -- return current position in output file
87        writeframesraw(data)
88                        -- write audio frames without pathing up the
89                           file header
90        writeframes(data)
91                        -- write audio frames and patch up the file header
92        close()         -- patch up the file header and close the
93                           output file
94You should set the parameters before the first writeframesraw or
95writeframes.  The total number of frames does not need to be set,
96but when it is set to the correct value, the header does not have to
97be patched up.
98It is best to first set all parameters, perhaps possibly the
99compression type, and then write audio frames using writeframesraw.
100When all frames have been written, either call writeframes(b'') or
101close() to patch up the sizes in the header.
102The close() method is called automatically when the class instance
103is destroyed.
104"""
105
106from collections import namedtuple
107import warnings
108
109warnings._deprecated(__name__, remove=(3, 13))
110
111
112_sunau_params = namedtuple('_sunau_params',
113                           'nchannels sampwidth framerate nframes comptype compname')
114
115# from <multimedia/audio_filehdr.h>
116AUDIO_FILE_MAGIC = 0x2e736e64
117AUDIO_FILE_ENCODING_MULAW_8 = 1
118AUDIO_FILE_ENCODING_LINEAR_8 = 2
119AUDIO_FILE_ENCODING_LINEAR_16 = 3
120AUDIO_FILE_ENCODING_LINEAR_24 = 4
121AUDIO_FILE_ENCODING_LINEAR_32 = 5
122AUDIO_FILE_ENCODING_FLOAT = 6
123AUDIO_FILE_ENCODING_DOUBLE = 7
124AUDIO_FILE_ENCODING_ADPCM_G721 = 23
125AUDIO_FILE_ENCODING_ADPCM_G722 = 24
126AUDIO_FILE_ENCODING_ADPCM_G723_3 = 25
127AUDIO_FILE_ENCODING_ADPCM_G723_5 = 26
128AUDIO_FILE_ENCODING_ALAW_8 = 27
129
130# from <multimedia/audio_hdr.h>
131AUDIO_UNKNOWN_SIZE = 0xFFFFFFFF        # ((unsigned)(~0))
132
133_simple_encodings = [AUDIO_FILE_ENCODING_MULAW_8,
134                     AUDIO_FILE_ENCODING_LINEAR_8,
135                     AUDIO_FILE_ENCODING_LINEAR_16,
136                     AUDIO_FILE_ENCODING_LINEAR_24,
137                     AUDIO_FILE_ENCODING_LINEAR_32,
138                     AUDIO_FILE_ENCODING_ALAW_8]
139
140class Error(Exception):
141    pass
142
143def _read_u32(file):
144    x = 0
145    for i in range(4):
146        byte = file.read(1)
147        if not byte:
148            raise EOFError
149        x = x*256 + ord(byte)
150    return x
151
152def _write_u32(file, x):
153    data = []
154    for i in range(4):
155        d, m = divmod(x, 256)
156        data.insert(0, int(m))
157        x = d
158    file.write(bytes(data))
159
160class Au_read:
161
162    def __init__(self, f):
163        if type(f) == type(''):
164            import builtins
165            f = builtins.open(f, 'rb')
166            self._opened = True
167        else:
168            self._opened = False
169        self.initfp(f)
170
171    def __del__(self):
172        if self._file:
173            self.close()
174
175    def __enter__(self):
176        return self
177
178    def __exit__(self, *args):
179        self.close()
180
181    def initfp(self, file):
182        self._file = file
183        self._soundpos = 0
184        magic = int(_read_u32(file))
185        if magic != AUDIO_FILE_MAGIC:
186            raise Error('bad magic number')
187        self._hdr_size = int(_read_u32(file))
188        if self._hdr_size < 24:
189            raise Error('header size too small')
190        if self._hdr_size > 100:
191            raise Error('header size ridiculously large')
192        self._data_size = _read_u32(file)
193        if self._data_size != AUDIO_UNKNOWN_SIZE:
194            self._data_size = int(self._data_size)
195        self._encoding = int(_read_u32(file))
196        if self._encoding not in _simple_encodings:
197            raise Error('encoding not (yet) supported')
198        if self._encoding in (AUDIO_FILE_ENCODING_MULAW_8,
199                  AUDIO_FILE_ENCODING_ALAW_8):
200            self._sampwidth = 2
201            self._framesize = 1
202        elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_8:
203            self._framesize = self._sampwidth = 1
204        elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_16:
205            self._framesize = self._sampwidth = 2
206        elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_24:
207            self._framesize = self._sampwidth = 3
208        elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_32:
209            self._framesize = self._sampwidth = 4
210        else:
211            raise Error('unknown encoding')
212        self._framerate = int(_read_u32(file))
213        self._nchannels = int(_read_u32(file))
214        if not self._nchannels:
215            raise Error('bad # of channels')
216        self._framesize = self._framesize * self._nchannels
217        if self._hdr_size > 24:
218            self._info = file.read(self._hdr_size - 24)
219            self._info, _, _ = self._info.partition(b'\0')
220        else:
221            self._info = b''
222        try:
223            self._data_pos = file.tell()
224        except (AttributeError, OSError):
225            self._data_pos = None
226
227    def getfp(self):
228        return self._file
229
230    def getnchannels(self):
231        return self._nchannels
232
233    def getsampwidth(self):
234        return self._sampwidth
235
236    def getframerate(self):
237        return self._framerate
238
239    def getnframes(self):
240        if self._data_size == AUDIO_UNKNOWN_SIZE:
241            return AUDIO_UNKNOWN_SIZE
242        if self._encoding in _simple_encodings:
243            return self._data_size // self._framesize
244        return 0                # XXX--must do some arithmetic here
245
246    def getcomptype(self):
247        if self._encoding == AUDIO_FILE_ENCODING_MULAW_8:
248            return 'ULAW'
249        elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8:
250            return 'ALAW'
251        else:
252            return 'NONE'
253
254    def getcompname(self):
255        if self._encoding == AUDIO_FILE_ENCODING_MULAW_8:
256            return 'CCITT G.711 u-law'
257        elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8:
258            return 'CCITT G.711 A-law'
259        else:
260            return 'not compressed'
261
262    def getparams(self):
263        return _sunau_params(self.getnchannels(), self.getsampwidth(),
264                  self.getframerate(), self.getnframes(),
265                  self.getcomptype(), self.getcompname())
266
267    def getmarkers(self):
268        return None
269
270    def getmark(self, id):
271        raise Error('no marks')
272
273    def readframes(self, nframes):
274        if self._encoding in _simple_encodings:
275            if nframes == AUDIO_UNKNOWN_SIZE:
276                data = self._file.read()
277            else:
278                data = self._file.read(nframes * self._framesize)
279            self._soundpos += len(data) // self._framesize
280            if self._encoding == AUDIO_FILE_ENCODING_MULAW_8:
281                with warnings.catch_warnings():
282                    warnings.simplefilter('ignore', category=DeprecationWarning)
283                    import audioop
284                data = audioop.ulaw2lin(data, self._sampwidth)
285            return data
286        return None             # XXX--not implemented yet
287
288    def rewind(self):
289        if self._data_pos is None:
290            raise OSError('cannot seek')
291        self._file.seek(self._data_pos)
292        self._soundpos = 0
293
294    def tell(self):
295        return self._soundpos
296
297    def setpos(self, pos):
298        if pos < 0 or pos > self.getnframes():
299            raise Error('position not in range')
300        if self._data_pos is None:
301            raise OSError('cannot seek')
302        self._file.seek(self._data_pos + pos * self._framesize)
303        self._soundpos = pos
304
305    def close(self):
306        file = self._file
307        if file:
308            self._file = None
309            if self._opened:
310                file.close()
311
312class Au_write:
313
314    def __init__(self, f):
315        if type(f) == type(''):
316            import builtins
317            f = builtins.open(f, 'wb')
318            self._opened = True
319        else:
320            self._opened = False
321        self.initfp(f)
322
323    def __del__(self):
324        if self._file:
325            self.close()
326        self._file = None
327
328    def __enter__(self):
329        return self
330
331    def __exit__(self, *args):
332        self.close()
333
334    def initfp(self, file):
335        self._file = file
336        self._framerate = 0
337        self._nchannels = 0
338        self._sampwidth = 0
339        self._framesize = 0
340        self._nframes = AUDIO_UNKNOWN_SIZE
341        self._nframeswritten = 0
342        self._datawritten = 0
343        self._datalength = 0
344        self._info = b''
345        self._comptype = 'ULAW' # default is U-law
346
347    def setnchannels(self, nchannels):
348        if self._nframeswritten:
349            raise Error('cannot change parameters after starting to write')
350        if nchannels not in (1, 2, 4):
351            raise Error('only 1, 2, or 4 channels supported')
352        self._nchannels = nchannels
353
354    def getnchannels(self):
355        if not self._nchannels:
356            raise Error('number of channels not set')
357        return self._nchannels
358
359    def setsampwidth(self, sampwidth):
360        if self._nframeswritten:
361            raise Error('cannot change parameters after starting to write')
362        if sampwidth not in (1, 2, 3, 4):
363            raise Error('bad sample width')
364        self._sampwidth = sampwidth
365
366    def getsampwidth(self):
367        if not self._framerate:
368            raise Error('sample width not specified')
369        return self._sampwidth
370
371    def setframerate(self, framerate):
372        if self._nframeswritten:
373            raise Error('cannot change parameters after starting to write')
374        self._framerate = framerate
375
376    def getframerate(self):
377        if not self._framerate:
378            raise Error('frame rate not set')
379        return self._framerate
380
381    def setnframes(self, nframes):
382        if self._nframeswritten:
383            raise Error('cannot change parameters after starting to write')
384        if nframes < 0:
385            raise Error('# of frames cannot be negative')
386        self._nframes = nframes
387
388    def getnframes(self):
389        return self._nframeswritten
390
391    def setcomptype(self, type, name):
392        if type in ('NONE', 'ULAW'):
393            self._comptype = type
394        else:
395            raise Error('unknown compression type')
396
397    def getcomptype(self):
398        return self._comptype
399
400    def getcompname(self):
401        if self._comptype == 'ULAW':
402            return 'CCITT G.711 u-law'
403        elif self._comptype == 'ALAW':
404            return 'CCITT G.711 A-law'
405        else:
406            return 'not compressed'
407
408    def setparams(self, params):
409        nchannels, sampwidth, framerate, nframes, comptype, compname = params
410        self.setnchannels(nchannels)
411        self.setsampwidth(sampwidth)
412        self.setframerate(framerate)
413        self.setnframes(nframes)
414        self.setcomptype(comptype, compname)
415
416    def getparams(self):
417        return _sunau_params(self.getnchannels(), self.getsampwidth(),
418                  self.getframerate(), self.getnframes(),
419                  self.getcomptype(), self.getcompname())
420
421    def tell(self):
422        return self._nframeswritten
423
424    def writeframesraw(self, data):
425        if not isinstance(data, (bytes, bytearray)):
426            data = memoryview(data).cast('B')
427        self._ensure_header_written()
428        if self._comptype == 'ULAW':
429            with warnings.catch_warnings():
430                warnings.simplefilter('ignore', category=DeprecationWarning)
431                import audioop
432            data = audioop.lin2ulaw(data, self._sampwidth)
433        nframes = len(data) // self._framesize
434        self._file.write(data)
435        self._nframeswritten = self._nframeswritten + nframes
436        self._datawritten = self._datawritten + len(data)
437
438    def writeframes(self, data):
439        self.writeframesraw(data)
440        if self._nframeswritten != self._nframes or \
441                  self._datalength != self._datawritten:
442            self._patchheader()
443
444    def close(self):
445        if self._file:
446            try:
447                self._ensure_header_written()
448                if self._nframeswritten != self._nframes or \
449                        self._datalength != self._datawritten:
450                    self._patchheader()
451                self._file.flush()
452            finally:
453                file = self._file
454                self._file = None
455                if self._opened:
456                    file.close()
457
458    #
459    # private methods
460    #
461
462    def _ensure_header_written(self):
463        if not self._nframeswritten:
464            if not self._nchannels:
465                raise Error('# of channels not specified')
466            if not self._sampwidth:
467                raise Error('sample width not specified')
468            if not self._framerate:
469                raise Error('frame rate not specified')
470            self._write_header()
471
472    def _write_header(self):
473        if self._comptype == 'NONE':
474            if self._sampwidth == 1:
475                encoding = AUDIO_FILE_ENCODING_LINEAR_8
476                self._framesize = 1
477            elif self._sampwidth == 2:
478                encoding = AUDIO_FILE_ENCODING_LINEAR_16
479                self._framesize = 2
480            elif self._sampwidth == 3:
481                encoding = AUDIO_FILE_ENCODING_LINEAR_24
482                self._framesize = 3
483            elif self._sampwidth == 4:
484                encoding = AUDIO_FILE_ENCODING_LINEAR_32
485                self._framesize = 4
486            else:
487                raise Error('internal error')
488        elif self._comptype == 'ULAW':
489            encoding = AUDIO_FILE_ENCODING_MULAW_8
490            self._framesize = 1
491        else:
492            raise Error('internal error')
493        self._framesize = self._framesize * self._nchannels
494        _write_u32(self._file, AUDIO_FILE_MAGIC)
495        header_size = 25 + len(self._info)
496        header_size = (header_size + 7) & ~7
497        _write_u32(self._file, header_size)
498        if self._nframes == AUDIO_UNKNOWN_SIZE:
499            length = AUDIO_UNKNOWN_SIZE
500        else:
501            length = self._nframes * self._framesize
502        try:
503            self._form_length_pos = self._file.tell()
504        except (AttributeError, OSError):
505            self._form_length_pos = None
506        _write_u32(self._file, length)
507        self._datalength = length
508        _write_u32(self._file, encoding)
509        _write_u32(self._file, self._framerate)
510        _write_u32(self._file, self._nchannels)
511        self._file.write(self._info)
512        self._file.write(b'\0'*(header_size - len(self._info) - 24))
513
514    def _patchheader(self):
515        if self._form_length_pos is None:
516            raise OSError('cannot seek')
517        self._file.seek(self._form_length_pos)
518        _write_u32(self._file, self._datawritten)
519        self._datalength = self._datawritten
520        self._file.seek(0, 2)
521
522def open(f, mode=None):
523    if mode is None:
524        if hasattr(f, 'mode'):
525            mode = f.mode
526        else:
527            mode = 'rb'
528    if mode in ('r', 'rb'):
529        return Au_read(f)
530    elif mode in ('w', 'wb'):
531        return Au_write(f)
532    else:
533        raise Error("mode must be 'r', 'rb', 'w', or 'wb'")
534