1# Lint as: python3
2"""Utils for blue tooth tests.
3
4Partly ported from acts/framework/acts/test_utils/bt/bt_test_utils.py
5"""
6
7from __future__ import absolute_import
8from __future__ import division
9from __future__ import print_function
10
11import logging as log
12import os
13import random
14import string
15import time
16import wave
17from queue import Empty
18from typing import Optional
19
20from mobly.controllers.android_device import AndroidDevice
21
22
23class BtTestUtilsError(Exception):
24    pass
25
26
27def convert_pcm_to_wav(pcm_file_path, wave_file_path, audio_params):
28    """Converts raw pcm data into wave file.
29
30    Args:
31        pcm_file_path: File path of origin pcm file.
32        wave_file_path: File path of converted wave file.
33        audio_params: A dict with audio configuration.
34    """
35    with open(pcm_file_path, 'rb') as pcm_file:
36        frames = pcm_file.read()
37    write_record_file(wave_file_path, audio_params, frames)
38
39
40def create_vcf_from_vcard(output_path: str,
41                          num_of_contacts: int,
42                          first_name: Optional[str] = None,
43                          last_name: Optional[str] = None,
44                          phone_number: Optional[int] = None) -> str:
45    """Creates a vcf file from vCard.
46
47    Args:
48        output_path: Path of the output vcf file.
49        num_of_contacts: Number of contacts to be generated.
50        first_name: First name of the contacts.
51        last_name: Last name of the contacts.
52        phone_number: Phone number of the contacts.
53
54    Returns:
55        vcf_file_path: Path of the output vcf file. E.g.
56            "/<output_path>/contacts_<time>.vcf".
57    """
58    file_name = f'contacts_{int(time.time())}.vcf'
59    vcf_file_path = os.path.join(output_path, file_name)
60    with open(vcf_file_path, 'w+') as f:
61        for i in range(num_of_contacts):
62            lines = []
63            if first_name is None:
64                first_name = 'Person'
65            vcard_last_name = last_name
66            if last_name is None:
67                vcard_last_name = i
68            vcard_phone_number = phone_number
69            if phone_number is None:
70                vcard_phone_number = random.randrange(int(10e10))
71            lines.append('BEGIN:VCARD\n')
72            lines.append('VERSION:2.1\n')
73            lines.append(f'N:{vcard_last_name};{first_name};;;\n')
74            lines.append(f'FN:{first_name} {vcard_last_name}\n')
75            lines.append(f'TEL;CELL:{vcard_phone_number}\n')
76            lines.append(f'EMAIL;PREF:{first_name}{vcard_last_name}@gmail.com\n')
77            lines.append('END:VCARD\n')
78            f.write(''.join(lines))
79    return vcf_file_path
80
81
82def generate_id_by_size(size, chars=(string.ascii_lowercase + string.ascii_uppercase + string.digits)):
83    """Generate random ascii characters of input size and input char types.
84
85    Args:
86        size: Input size of string.
87        chars: (Optional) Chars to use in generating a random string.
88
89    Returns:
90        String of random input chars at the input size.
91    """
92    return ''.join(random.choice(chars) for _ in range(size))
93
94
95def get_duration_seconds(wav_file_path):
96    """Get duration of most recently recorded file.
97
98    Args:
99        wav_file_path: path of the wave file.
100
101    Returns:
102        duration (float): duration of recorded file in seconds.
103    """
104    f = wave.open(wav_file_path, 'r')
105    frames = f.getnframes()
106    rate = f.getframerate()
107    duration = (frames / float(rate))
108    f.close()
109    return duration
110
111
112def wait_until(timeout_sec, condition_func, func_args, expected_value, exception=None, interval_sec=0.5):
113    """Waits until a function returns a expected value or timeout is reached.
114
115    Example usage:
116        ```
117        def is_bluetooth_enabled(device) -> bool:
118          do something and return something...
119
120        # Waits and checks if Bluetooth is turned on.
121        bt_test_utils.wait_until(
122            timeout_sec=10,
123            condition_func=is_bluetooth_enabled,
124            func_args=[dut],
125            expected_value=True,
126            exception=signals.TestFailure('Failed to turn on Bluetooth.'),
127            interval_sec=1)
128        ```
129
130    Args:
131        timeout_sec: float, max waiting time in seconds.
132        condition_func: function, when the condiction function returns the expected
133            value, the waiting mechanism will be interrupted.
134        func_args: tuple or list, the arguments for the condition function.
135        expected_value: a expected value that the condition function returns.
136        exception: Exception, an exception will be raised when timed out if needed.
137        interval_sec: float, interval time between calls of the condition function
138            in seconds.
139
140    Returns:
141        True if the function returns the expected value else False.
142    """
143    start_time = time.time()
144    end_time = start_time + timeout_sec
145    while time.time() < end_time:
146        if condition_func(*func_args) == expected_value:
147            return True
148        time.sleep(interval_sec)
149    args_string = ', '.join(list(map(str, func_args)))
150    log.warning('Timed out after %.1fs waiting for "%s(%s)" to be "%s".', timeout_sec, condition_func.__name__,
151                args_string, expected_value)
152    if exception:
153        raise exception
154    return False
155
156
157def write_read_verify_data_sl4a(client_ad, server_ad, msg, binary=False):
158    """Verify that the client wrote data to the server Android device correctly.
159
160    Args:
161        client_ad: the Android device to perform the write.
162        server_ad: the Android device to read the data written.
163        msg: the message to write.
164        binary: if the msg arg is binary or not.
165
166    Returns:
167        True if the data written matches the data read, false if not.
168    """
169    client_ad.log.info('Write message %s.', msg)
170    if binary:
171        client_ad.sl4a.bluetoothSocketConnWriteBinary(msg)
172    else:
173        client_ad.sl4a.bluetoothSocketConnWrite(msg)
174    server_ad.log.info('Read message %s.', msg)
175    if binary:
176        read_msg = server_ad.sl4a.bluetoothSocketConnReadBinary().rstrip('\r\n')
177    else:
178        read_msg = server_ad.sl4a.bluetoothSocketConnRead()
179    log.info('Verify message.')
180    if msg != read_msg:
181        log.error('Mismatch! Read: %s, Expected: %s', read_msg, msg)
182        return False
183    log.info('Matched! Read: %s, Expected: %s', read_msg, msg)
184    return True
185
186
187def write_record_file(file_name, audio_params, frames):
188    """Writes the recorded audio into the file.
189
190    Args:
191        file_name: The file name for writing the recorded audio.
192        audio_params: A dict with audio configuration.
193        frames: Recorded audio frames.
194    """
195    log.debug('writing frame to %s', file_name)
196    wf = wave.open(file_name, 'wb')
197    wf.setnchannels(audio_params['channel'])
198    wf.setsampwidth(audio_params.get('sample_width', 1))
199    wf.setframerate(audio_params['sample_rate'])
200    wf.writeframes(frames)
201    wf.close()
202