1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19
20import collections
21import collections.abc
22import logging
23import asyncio
24import dataclasses
25import enum
26import traceback
27import pyee
28import re
29from typing import (
30    Dict,
31    List,
32    Union,
33    Set,
34    Any,
35    Optional,
36    Type,
37    Tuple,
38    ClassVar,
39    Iterable,
40    TYPE_CHECKING,
41)
42from typing_extensions import Self
43
44from bumble import at
45from bumble import device
46from bumble import rfcomm
47from bumble import sdp
48from bumble.colors import color
49from bumble.core import (
50    ProtocolError,
51    BT_GENERIC_AUDIO_SERVICE,
52    BT_HANDSFREE_SERVICE,
53    BT_HANDSFREE_AUDIO_GATEWAY_SERVICE,
54    BT_L2CAP_PROTOCOL_ID,
55    BT_RFCOMM_PROTOCOL_ID,
56)
57from bumble.hci import (
58    HCI_Enhanced_Setup_Synchronous_Connection_Command,
59    CodingFormat,
60    CodecID,
61)
62
63
64# -----------------------------------------------------------------------------
65# Logging
66# -----------------------------------------------------------------------------
67logger = logging.getLogger(__name__)
68
69# -----------------------------------------------------------------------------
70# Error
71# -----------------------------------------------------------------------------
72
73
74class HfpProtocolError(ProtocolError):
75    def __init__(self, error_name: str = '', details: str = ''):
76        super().__init__(None, 'hfp', error_name, details)
77
78
79# -----------------------------------------------------------------------------
80# Protocol Support
81# -----------------------------------------------------------------------------
82
83
84# -----------------------------------------------------------------------------
85class HfpProtocol:
86    dlc: rfcomm.DLC
87    buffer: str
88    lines: collections.deque
89    lines_available: asyncio.Event
90
91    def __init__(self, dlc: rfcomm.DLC) -> None:
92        self.dlc = dlc
93        self.buffer = ''
94        self.lines = collections.deque()
95        self.lines_available = asyncio.Event()
96
97        dlc.sink = self.feed
98
99    def feed(self, data: Union[bytes, str]) -> None:
100        # Convert the data to a string if needed
101        if isinstance(data, bytes):
102            data = data.decode('utf-8')
103
104        logger.debug(f'<<< Data received: {data}')
105
106        # Add to the buffer and look for lines
107        self.buffer += data
108        while (separator := self.buffer.find('\r')) >= 0:
109            line = self.buffer[:separator].strip()
110            self.buffer = self.buffer[separator + 1 :]
111            if len(line) > 0:
112                self.on_line(line)
113
114    def on_line(self, line: str) -> None:
115        self.lines.append(line)
116        self.lines_available.set()
117
118    def send_command_line(self, line: str) -> None:
119        logger.debug(color(f'>>> {line}', 'yellow'))
120        self.dlc.write(line + '\r')
121
122    def send_response_line(self, line: str) -> None:
123        logger.debug(color(f'>>> {line}', 'yellow'))
124        self.dlc.write('\r\n' + line + '\r\n')
125
126    async def next_line(self) -> str:
127        await self.lines_available.wait()
128        line = self.lines.popleft()
129        if not self.lines:
130            self.lines_available.clear()
131        logger.debug(color(f'<<< {line}', 'green'))
132        return line
133
134
135# -----------------------------------------------------------------------------
136# Normative protocol definitions
137# -----------------------------------------------------------------------------
138
139
140class HfFeature(enum.IntFlag):
141    """
142    HF supported features (AT+BRSF=) (normative).
143
144    Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
145    """
146
147    EC_NR = 0x001  # Echo Cancel & Noise reduction
148    THREE_WAY_CALLING = 0x002
149    CLI_PRESENTATION_CAPABILITY = 0x004
150    VOICE_RECOGNITION_ACTIVATION = 0x008
151    REMOTE_VOLUME_CONTROL = 0x010
152    ENHANCED_CALL_STATUS = 0x020
153    ENHANCED_CALL_CONTROL = 0x040
154    CODEC_NEGOTIATION = 0x080
155    HF_INDICATORS = 0x100
156    ESCO_S4_SETTINGS_SUPPORTED = 0x200
157    ENHANCED_VOICE_RECOGNITION_STATUS = 0x400
158    VOICE_RECOGNITION_TEST = 0x800
159
160
161class AgFeature(enum.IntFlag):
162    """
163    AG supported features (+BRSF:) (normative).
164
165    Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
166    """
167
168    THREE_WAY_CALLING = 0x001
169    EC_NR = 0x002  # Echo Cancel & Noise reduction
170    VOICE_RECOGNITION_FUNCTION = 0x004
171    IN_BAND_RING_TONE_CAPABILITY = 0x008
172    VOICE_TAG = 0x010  # Attach a number to voice tag
173    REJECT_CALL = 0x020  # Ability to reject a call
174    ENHANCED_CALL_STATUS = 0x040
175    ENHANCED_CALL_CONTROL = 0x080
176    EXTENDED_ERROR_RESULT_CODES = 0x100
177    CODEC_NEGOTIATION = 0x200
178    HF_INDICATORS = 0x400
179    ESCO_S4_SETTINGS_SUPPORTED = 0x800
180    ENHANCED_VOICE_RECOGNITION_STATUS = 0x1000
181    VOICE_RECOGNITION_TEST = 0x2000
182
183
184class AudioCodec(enum.IntEnum):
185    """
186    Audio Codec IDs (normative).
187
188    Hands-Free Profile v1.9, 11 Appendix B
189    """
190
191    CVSD = 0x01  # Support for CVSD audio codec
192    MSBC = 0x02  # Support for mSBC audio codec
193    LC3_SWB = 0x03  # Support for LC3-SWB audio codec
194
195
196class HfIndicator(enum.IntEnum):
197    """
198    HF Indicators (normative).
199
200    Bluetooth Assigned Numbers, 6.10.1 HF Indicators.
201    """
202
203    ENHANCED_SAFETY = 0x01  # Enhanced safety feature
204    BATTERY_LEVEL = 0x02  # Battery level feature
205
206
207class CallHoldOperation(enum.Enum):
208    """
209    Call Hold supported operations (normative).
210
211    AT Commands Reference Guide, 3.5.2.3.12 +CHLD - Call Holding Services.
212    """
213
214    RELEASE_ALL_HELD_CALLS = "0"  # Release all held calls
215    RELEASE_ALL_ACTIVE_CALLS = "1"  # Release all active calls, accept other
216    RELEASE_SPECIFIC_CALL = "1x"  # Release a specific call X
217    HOLD_ALL_ACTIVE_CALLS = "2"  # Place all active calls on hold, accept other
218    HOLD_ALL_CALLS_EXCEPT = "2x"  # Place all active calls except call X
219    ADD_HELD_CALL = "3"  # Adds a held call to conversation
220    CONNECT_TWO_CALLS = (
221        "4"  # Connects the two calls and disconnects the subscriber from both calls
222    )
223
224
225class ResponseHoldStatus(enum.IntEnum):
226    """
227    Response Hold status (normative).
228
229    Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
230    """
231
232    INC_CALL_HELD = 0  # Put incoming call on hold
233    HELD_CALL_ACC = 1  # Accept a held incoming call
234    HELD_CALL_REJ = 2  # Reject a held incoming call
235
236
237class AgIndicator(enum.Enum):
238    """
239    Values for the AG indicator (normative).
240
241    Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
242    """
243
244    SERVICE = 'service'
245    CALL = 'call'
246    CALL_SETUP = 'callsetup'
247    CALL_HELD = 'callheld'
248    SIGNAL = 'signal'
249    ROAM = 'roam'
250    BATTERY_CHARGE = 'battchg'
251
252
253class CallSetupAgIndicator(enum.IntEnum):
254    """
255    Values for the Call Setup AG indicator (normative).
256
257    Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
258    """
259
260    NOT_IN_CALL_SETUP = 0
261    INCOMING_CALL_PROCESS = 1
262    OUTGOING_CALL_SETUP = 2
263    REMOTE_ALERTED = 3  # Remote party alerted in an outgoing call
264
265
266class CallHeldAgIndicator(enum.IntEnum):
267    """
268    Values for the Call Held AG indicator (normative).
269
270    Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
271    """
272
273    NO_CALLS_HELD = 0
274    # Call is placed on hold or active/held calls swapped
275    # (The AG has both an active AND a held call)
276    CALL_ON_HOLD_AND_ACTIVE_CALL = 1
277    CALL_ON_HOLD_NO_ACTIVE_CALL = 2  # Call on hold, no active call
278
279
280class CallInfoDirection(enum.IntEnum):
281    """
282    Call Info direction (normative).
283
284    AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls.
285    """
286
287    MOBILE_ORIGINATED_CALL = 0
288    MOBILE_TERMINATED_CALL = 1
289
290
291class CallInfoStatus(enum.IntEnum):
292    """
293    Call Info status (normative).
294
295    AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls.
296    """
297
298    ACTIVE = 0
299    HELD = 1
300    DIALING = 2
301    ALERTING = 3
302    INCOMING = 4
303    WAITING = 5
304
305
306class CallInfoMode(enum.IntEnum):
307    """
308    Call Info mode (normative).
309
310    AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls.
311    """
312
313    VOICE = 0
314    DATA = 1
315    FAX = 2
316    UNKNOWN = 9
317
318
319class CallInfoMultiParty(enum.IntEnum):
320    """
321    Call Info Multi-Party state (normative).
322
323    AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls.
324    """
325
326    NOT_IN_CONFERENCE = 0
327    IN_CONFERENCE = 1
328
329
330@dataclasses.dataclass
331class CallInfo:
332    """
333    Enhanced call status.
334
335    AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls.
336    """
337
338    index: int
339    direction: CallInfoDirection
340    status: CallInfoStatus
341    mode: CallInfoMode
342    multi_party: CallInfoMultiParty
343    number: Optional[str] = None
344    type: Optional[int] = None
345
346
347@dataclasses.dataclass
348class CallLineIdentification:
349    """
350    Calling Line Identification notification.
351
352    TS 127 007 - V6.8.0, 7.6 Calling line identification presentation +CLIP, but only
353    number, type and alpha are meaningful in HFP.
354
355    Attributes:
356        number: String type phone number of format specified by `type`.
357        type: Type of address octet in integer format (refer TS 24.008 [8] subclause
358        10.5.4.7).
359        subaddr: String type subaddress of format specified by `satype`.
360        satype: Type of subaddress octet in integer format (refer TS 24.008 [8]
361        subclause 10.5.4.8).
362        alpha: Optional string type alphanumeric representation of number corresponding
363        to the entry found in phonebook; used character set should be the one selected
364        with command Select TE Character Set +CSCS.
365        cli_validity: 0 CLI valid, 1 CLI has been withheld by the originator, 2 CLI is
366        not available due to interworking problems or limitations of originating
367        network.
368    """
369
370    number: str
371    type: int
372    subaddr: Optional[str] = None
373    satype: Optional[int] = None
374    alpha: Optional[str] = None
375    cli_validity: Optional[int] = None
376
377    @classmethod
378    def parse_from(cls: Type[Self], parameters: List[bytes]) -> Self:
379        return cls(
380            number=parameters[0].decode(),
381            type=int(parameters[1]),
382            subaddr=parameters[2].decode() if len(parameters) >= 3 else None,
383            satype=(
384                int(parameters[3]) if len(parameters) >= 4 and parameters[3] else None
385            ),
386            alpha=parameters[4].decode() if len(parameters) >= 5 else None,
387            cli_validity=(
388                int(parameters[5]) if len(parameters) >= 6 and parameters[5] else None
389            ),
390        )
391
392    def to_clip_string(self) -> str:
393        return ','.join(
394            str(arg) if arg else ''
395            for arg in [
396                self.number,
397                self.type,
398                self.subaddr,
399                self.satype,
400                self.alpha,
401                self.cli_validity,
402            ]
403        )
404
405
406class VoiceRecognitionState(enum.IntEnum):
407    """
408    vrec values provided in AT+BVRA command.
409
410    Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
411    """
412
413    DISABLE = 0
414    ENABLE = 1
415    # (Enhanced Voice Recognition Status only) HF is ready to accept audio.
416    ENHANCED_READY = 2
417
418
419class CmeError(enum.IntEnum):
420    """
421    CME ERROR codes (partial listed).
422
423    TS 127 007 - V6.8.0, 9.2.1 General errors
424    """
425
426    PHONE_FAILURE = 0
427    OPERATION_NOT_ALLOWED = 3
428    OPERATION_NOT_SUPPORTED = 4
429    MEMORY_FULL = 20
430    INVALID_INDEX = 21
431    NOT_FOUND = 22
432
433
434# -----------------------------------------------------------------------------
435# Hands-Free Control Interoperability Requirements
436# -----------------------------------------------------------------------------
437
438# Response codes.
439RESPONSE_CODES = {
440    "+APLSIRI",
441    "+BAC",
442    "+BCC",
443    "+BCS",
444    "+BIA",
445    "+BIEV",
446    "+BIND",
447    "+BINP",
448    "+BLDN",
449    "+BRSF",
450    "+BTRH",
451    "+BVRA",
452    "+CCWA",
453    "+CHLD",
454    "+CHUP",
455    "+CIND",
456    "+CLCC",
457    "+CLIP",
458    "+CMEE",
459    "+CMER",
460    "+CNUM",
461    "+COPS",
462    "+IPHONEACCEV",
463    "+NREC",
464    "+VGM",
465    "+VGS",
466    "+VTS",
467    "+XAPL",
468    "A",
469    "D",
470}
471
472# Unsolicited responses and statuses.
473UNSOLICITED_CODES = {
474    "+APLSIRI",
475    "+BCS",
476    "+BIND",
477    "+BSIR",
478    "+BTRH",
479    "+BVRA",
480    "+CCWA",
481    "+CIEV",
482    "+CLIP",
483    "+VGM",
484    "+VGS",
485    "BLACKLISTED",
486    "BUSY",
487    "DELAYED",
488    "NO ANSWER",
489    "NO CARRIER",
490    "RING",
491}
492
493# Status codes
494STATUS_CODES = {
495    "+CME ERROR",
496    "BLACKLISTED",
497    "BUSY",
498    "DELAYED",
499    "ERROR",
500    "NO ANSWER",
501    "NO CARRIER",
502    "OK",
503}
504
505
506@dataclasses.dataclass
507class HfConfiguration:
508    supported_hf_features: List[HfFeature]
509    supported_hf_indicators: List[HfIndicator]
510    supported_audio_codecs: List[AudioCodec]
511
512
513@dataclasses.dataclass
514class AgConfiguration:
515    supported_ag_features: Iterable[AgFeature]
516    supported_ag_indicators: collections.abc.Sequence[AgIndicatorState]
517    supported_hf_indicators: Iterable[HfIndicator]
518    supported_ag_call_hold_operations: Iterable[CallHoldOperation]
519    supported_audio_codecs: Iterable[AudioCodec]
520
521
522class AtResponseType(enum.Enum):
523    """
524    Indicates if a response is expected from an AT command, and if multiple responses are accepted.
525    """
526
527    NONE = 0
528    SINGLE = 1
529    MULTIPLE = 2
530
531
532@dataclasses.dataclass
533class AtResponse:
534    code: str
535    parameters: list
536
537    @classmethod
538    def parse_from(cls: Type[Self], buffer: bytearray) -> Self:
539        code_and_parameters = buffer.split(b':')
540        parameters = (
541            code_and_parameters[1] if len(code_and_parameters) > 1 else bytearray()
542        )
543        return cls(
544            code=code_and_parameters[0].decode(),
545            parameters=at.parse_parameters(parameters),
546        )
547
548
549@dataclasses.dataclass
550class AtCommand:
551    class SubCode(str, enum.Enum):
552        NONE = ''
553        SET = '='
554        TEST = '=?'
555        READ = '?'
556
557    code: str
558    sub_code: SubCode
559    parameters: list
560
561    _PARSE_PATTERN: ClassVar[re.Pattern] = re.compile(
562        r'AT\+(?P<code>[A-Z]+)(?P<sub_code>=\?|=|\?)?(?P<parameters>.*)'
563    )
564
565    @classmethod
566    def parse_from(cls: Type[Self], buffer: bytearray) -> Self:
567        if not (match := cls._PARSE_PATTERN.fullmatch(buffer.decode())):
568            if buffer.startswith(b'ATA'):
569                return cls(code='A', sub_code=AtCommand.SubCode.NONE, parameters=[])
570            if buffer.startswith(b'ATD'):
571                return cls(
572                    code='D', sub_code=AtCommand.SubCode.NONE, parameters=[buffer[3:]]
573                )
574            raise HfpProtocolError('Invalid command')
575
576        parameters = []
577        if parameters_text := match.group('parameters'):
578            parameters = at.parse_parameters(parameters_text.encode())
579
580        return cls(
581            code=match.group('code'),
582            sub_code=AtCommand.SubCode(match.group('sub_code') or ''),
583            parameters=parameters,
584        )
585
586
587@dataclasses.dataclass
588class AgIndicatorState:
589    """State wrapper of AG indicator.
590
591    Attributes:
592        indicator: Indicator of this indicator state.
593        supported_values: Supported values of this indicator.
594        current_status: Current status of this indicator.
595        index: (HF only) Index of this indicator.
596        enabled: (AG only) Whether this indicator is enabled to report.
597        on_test_text: Text message reported in AT+CIND=? of this indicator.
598    """
599
600    indicator: AgIndicator
601    supported_values: Set[int]
602    current_status: int
603    index: Optional[int] = None
604    enabled: bool = True
605
606    @property
607    def on_test_text(self) -> str:
608        min_value = min(self.supported_values)
609        max_value = max(self.supported_values)
610        if len(self.supported_values) == (max_value - min_value + 1):
611            supported_values_text = f'({min_value}-{max_value})'
612        else:
613            supported_values_text = (
614                f'({",".join(str(v) for v in self.supported_values)})'
615            )
616        return f'(\"{self.indicator.value}\",{supported_values_text})'
617
618    @classmethod
619    def call(cls: Type[Self]) -> Self:
620        """Default call indicator state."""
621        return cls(
622            indicator=AgIndicator.CALL, supported_values={0, 1}, current_status=0
623        )
624
625    @classmethod
626    def callsetup(cls: Type[Self]) -> Self:
627        """Default callsetup indicator state."""
628        return cls(
629            indicator=AgIndicator.CALL_SETUP,
630            supported_values={0, 1, 2, 3},
631            current_status=0,
632        )
633
634    @classmethod
635    def callheld(cls: Type[Self]) -> Self:
636        """Default call indicator state."""
637        return cls(
638            indicator=AgIndicator.CALL_HELD,
639            supported_values={0, 1, 2},
640            current_status=0,
641        )
642
643    @classmethod
644    def service(cls: Type[Self]) -> Self:
645        """Default service indicator state."""
646        return cls(
647            indicator=AgIndicator.SERVICE, supported_values={0, 1}, current_status=0
648        )
649
650    @classmethod
651    def signal(cls: Type[Self]) -> Self:
652        """Default signal indicator state."""
653        return cls(
654            indicator=AgIndicator.SIGNAL,
655            supported_values={0, 1, 2, 3, 4, 5},
656            current_status=0,
657        )
658
659    @classmethod
660    def roam(cls: Type[Self]) -> Self:
661        """Default roam indicator state."""
662        return cls(
663            indicator=AgIndicator.CALL, supported_values={0, 1}, current_status=0
664        )
665
666    @classmethod
667    def battchg(cls: Type[Self]) -> Self:
668        """Default battery charge indicator state."""
669        return cls(
670            indicator=AgIndicator.BATTERY_CHARGE,
671            supported_values={0, 1, 2, 3, 4, 5},
672            current_status=0,
673        )
674
675
676@dataclasses.dataclass
677class HfIndicatorState:
678    """State wrapper of HF indicator.
679
680    Attributes:
681        indicator: Indicator of this indicator state.
682        supported: Whether this indicator is supported.
683        enabled: Whether this indicator is enabled.
684        current_status: Current (last-reported) status value of this indicaotr.
685    """
686
687    indicator: HfIndicator
688    supported: bool = False
689    enabled: bool = False
690    current_status: int = 0
691
692
693class HfProtocol(pyee.EventEmitter):
694    """
695    Implementation for the Hands-Free side of the Hands-Free profile.
696
697    Reference specification Hands-Free Profile v1.8.
698
699    Emitted events:
700        codec_negotiation: When codec is renegotiated, notify the new codec.
701            Args:
702                active_codec: AudioCodec
703        ag_indicator: When AG update their indicators, notify the new state.
704            Args:
705                ag_indicator: AgIndicator
706        speaker_volume: Emitted when AG update speaker volume autonomously.
707            Args:
708                volume: Int
709        microphone_volume: Emitted when AG update microphone volume autonomously.
710            Args:
711                volume: Int
712        microphone_volume: Emitted when AG sends a ringtone request.
713            Args:
714                None
715        cli_notification: Emitted when notify the call metadata on line.
716            Args:
717                cli_notification: CallLineIdentification
718        voice_recognition: Emitted when AG starts voice recognition autonomously.
719            Args:
720                vrec: VoiceRecognitionState
721    """
722
723    class HfLoopTermination(HfpProtocolError):
724        """Termination signal for run() loop."""
725
726    supported_hf_features: int
727    supported_audio_codecs: List[AudioCodec]
728
729    supported_ag_features: int
730    supported_ag_call_hold_operations: List[CallHoldOperation]
731
732    ag_indicators: List[AgIndicatorState]
733    hf_indicators: Dict[HfIndicator, HfIndicatorState]
734
735    dlc: rfcomm.DLC
736    command_lock: asyncio.Lock
737    if TYPE_CHECKING:
738        response_queue: asyncio.Queue[AtResponse]
739        unsolicited_queue: asyncio.Queue[Optional[AtResponse]]
740    else:
741        response_queue: asyncio.Queue
742        unsolicited_queue: asyncio.Queue
743    read_buffer: bytearray
744    active_codec: AudioCodec
745
746    def __init__(
747        self,
748        dlc: rfcomm.DLC,
749        configuration: HfConfiguration,
750    ) -> None:
751        super().__init__()
752
753        # Configure internal state.
754        self.dlc = dlc
755        self.command_lock = asyncio.Lock()
756        self.response_queue = asyncio.Queue()
757        self.unsolicited_queue = asyncio.Queue()
758        self.read_buffer = bytearray()
759        self.active_codec = AudioCodec.CVSD
760        self._slc_initialized = False
761
762        # Build local features.
763        self.supported_hf_features = sum(configuration.supported_hf_features)
764        self.supported_audio_codecs = configuration.supported_audio_codecs
765
766        self.hf_indicators = {
767            indicator: HfIndicatorState(indicator=indicator)
768            for indicator in configuration.supported_hf_indicators
769        }
770
771        # Clear remote features.
772        self.supported_ag_features = 0
773        self.supported_ag_call_hold_operations = []
774        self.ag_indicators = []
775
776        # Bind the AT reader to the RFCOMM channel.
777        self.dlc.sink = self._read_at
778        # Stop the run() loop when L2CAP is closed.
779        self.dlc.multiplexer.l2cap_channel.on(
780            'close', lambda: self.unsolicited_queue.put_nowait(None)
781        )
782
783    def supports_hf_feature(self, feature: HfFeature) -> bool:
784        return (self.supported_hf_features & feature) != 0
785
786    def supports_ag_feature(self, feature: AgFeature) -> bool:
787        return (self.supported_ag_features & feature) != 0
788
789    def _read_at(self, data: bytes):
790        """
791        Reads AT messages from the RFCOMM channel.
792
793        Enqueues AT commands, responses, unsolicited responses to their respective queues, and set the corresponding event.
794        """
795        # Append to the read buffer.
796        self.read_buffer.extend(data)
797
798        # Locate header and trailer.
799        header = self.read_buffer.find(b'\r\n')
800        trailer = self.read_buffer.find(b'\r\n', header + 2)
801        if header == -1 or trailer == -1:
802            return
803
804        # Isolate the AT response code and parameters.
805        raw_response = self.read_buffer[header + 2 : trailer]
806        response = AtResponse.parse_from(raw_response)
807        logger.debug(f"<<< {raw_response.decode()}")
808
809        # Consume the response bytes.
810        self.read_buffer = self.read_buffer[trailer + 2 :]
811
812        # Forward the received code to the correct queue.
813        if self.command_lock.locked() and (
814            response.code in STATUS_CODES or response.code in RESPONSE_CODES
815        ):
816            self.response_queue.put_nowait(response)
817        elif response.code in UNSOLICITED_CODES:
818            self.unsolicited_queue.put_nowait(response)
819        else:
820            logger.warning(f"dropping unexpected response with code '{response.code}'")
821
822    async def execute_command(
823        self,
824        cmd: str,
825        timeout: float = 1.0,
826        response_type: AtResponseType = AtResponseType.NONE,
827    ) -> Union[None, AtResponse, List[AtResponse]]:
828        """
829        Sends an AT command and wait for the peer response.
830        Wait for the AT responses sent by the peer, to the status code.
831
832        Args:
833            cmd: the AT command in string to execute.
834            timeout: timeout in float seconds.
835            response_type: type of response.
836
837        Raises:
838            asyncio.TimeoutError: the status is not received after a timeout (default 1 second).
839            ProtocolError: the status is not OK.
840        """
841        async with self.command_lock:
842            logger.debug(f">>> {cmd}")
843            self.dlc.write(cmd + '\r')
844            responses: List[AtResponse] = []
845
846            while True:
847                result = await asyncio.wait_for(
848                    self.response_queue.get(), timeout=timeout
849                )
850                if result.code == 'OK':
851                    if response_type == AtResponseType.SINGLE and len(responses) != 1:
852                        raise HfpProtocolError("NO ANSWER")
853
854                    if response_type == AtResponseType.MULTIPLE:
855                        return responses
856                    if response_type == AtResponseType.SINGLE:
857                        return responses[0]
858                    return None
859                if result.code in STATUS_CODES:
860                    raise HfpProtocolError(result.code)
861                responses.append(result)
862
863    async def initiate_slc(self):
864        """4.2.1 Service Level Connection Initialization."""
865
866        # 4.2.1.1 Supported features exchange
867        # First, in the initialization procedure, the HF shall send the
868        # AT+BRSF=<HF supported features> command to the AG to both notify
869        # the AG of the supported features in the HF, as well as to retrieve the
870        # supported features in the AG using the +BRSF result code.
871        response = await self.execute_command(
872            f"AT+BRSF={self.supported_hf_features}", response_type=AtResponseType.SINGLE
873        )
874
875        self.supported_ag_features = int(response.parameters[0])
876        logger.info(f"supported AG features: {self.supported_ag_features}")
877        for feature in AgFeature:
878            if self.supports_ag_feature(feature):
879                logger.info(f"  - {feature.name}")
880
881        # 4.2.1.2 Codec Negotiation
882        # Secondly, in the initialization procedure, if the HF supports the
883        # Codec Negotiation feature, it shall check if the AT+BRSF command
884        # response from the AG has indicated that it supports the Codec
885        # Negotiation feature.
886        if self.supports_hf_feature(
887            HfFeature.CODEC_NEGOTIATION
888        ) and self.supports_ag_feature(AgFeature.CODEC_NEGOTIATION):
889            # If both the HF and AG do support the Codec Negotiation feature
890            # then the HF shall send the AT+BAC=<HF available codecs> command to
891            # the AG to notify the AG of the available codecs in the HF.
892            codecs = [str(c.value) for c in self.supported_audio_codecs]
893            await self.execute_command(f"AT+BAC={','.join(codecs)}")
894
895        # 4.2.1.3 AG Indicators
896        # After having retrieved the supported features in the AG, the HF shall
897        # determine which indicators are supported by the AG, as well as the
898        # ordering of the supported indicators. This is because, according to
899        # the 3GPP 27.007 specification [2], the AG may support additional
900        # indicators not provided for by the Hands-Free Profile, and because the
901        # ordering of the indicators is implementation specific. The HF uses
902        # the AT+CIND=? Test command to retrieve information about the supported
903        # indicators and their ordering.
904        response = await self.execute_command(
905            "AT+CIND=?", response_type=AtResponseType.SINGLE
906        )
907
908        self.ag_indicators = []
909        for index, indicator in enumerate(response.parameters):
910            description = AgIndicator(indicator[0].decode())
911            supported_values = []
912            for value in indicator[1]:
913                value = value.split(b'-')
914                value = [int(v) for v in value]
915                value_min = value[0]
916                value_max = value[1] if len(value) > 1 else value[0]
917                supported_values.extend([v for v in range(value_min, value_max + 1)])
918
919            self.ag_indicators.append(
920                AgIndicatorState(description, index, set(supported_values), 0)
921            )
922
923        # Once the HF has the necessary supported indicator and ordering
924        # information, it shall retrieve the current status of the indicators
925        # in the AG using the AT+CIND? Read command.
926        response = await self.execute_command(
927            "AT+CIND?", response_type=AtResponseType.SINGLE
928        )
929
930        for index, indicator in enumerate(response.parameters):
931            self.ag_indicators[index].current_status = int(indicator)
932
933        # After having retrieved the status of the indicators in the AG, the HF
934        # shall then enable the "Indicators status update" function in the AG by
935        # issuing the AT+CMER command, to which the AG shall respond with OK.
936        await self.execute_command("AT+CMER=3,,,1")
937
938        if self.supports_hf_feature(
939            HfFeature.THREE_WAY_CALLING
940        ) and self.supports_ag_feature(AgFeature.THREE_WAY_CALLING):
941            # After the HF has enabled the “Indicators status update” function in
942            # the AG, and if the “Call waiting and 3-way calling” bit was set in the
943            # supported features bitmap by both the HF and the AG, the HF shall
944            # issue the AT+CHLD=? test command to retrieve the information about how
945            # the call hold and multiparty services are supported in the AG. The HF
946            # shall not issue the AT+CHLD=? test command in case either the HF or
947            # the AG does not support the "Three-way calling" feature.
948            response = await self.execute_command(
949                "AT+CHLD=?", response_type=AtResponseType.SINGLE
950            )
951
952            self.supported_ag_call_hold_operations = [
953                CallHoldOperation(operation.decode())
954                for operation in response.parameters[0]
955            ]
956
957        # 4.2.1.4 HF Indicators
958        # If the HF supports the HF indicator feature, it shall check the +BRSF
959        # response to see if the AG also supports the HF Indicator feature.
960        if self.supports_hf_feature(
961            HfFeature.HF_INDICATORS
962        ) and self.supports_ag_feature(AgFeature.HF_INDICATORS):
963            # If both the HF and AG support the HF Indicator feature, then the HF
964            # shall send the AT+BIND=<HF supported HF indicators> command to the AG
965            # to notify the AG of the supported indicators’ assigned numbers in the
966            # HF. The AG shall respond with OK
967            indicators = [str(i.value) for i in self.hf_indicators]
968            await self.execute_command(f"AT+BIND={','.join(indicators)}")
969
970            # After having provided the AG with the HF indicators it supports,
971            # the HF shall send the AT+BIND=? to request HF indicators supported
972            # by the AG. The AG shall reply with the +BIND response listing all
973            # HF indicators that it supports followed by an OK.
974            response = await self.execute_command(
975                "AT+BIND=?", response_type=AtResponseType.SINGLE
976            )
977
978            logger.info("supported HF indicators:")
979            for indicator in response.parameters[0]:
980                indicator = HfIndicator(int(indicator))
981                logger.info(f"  - {indicator.name}")
982                if indicator in self.hf_indicators:
983                    self.hf_indicators[indicator].supported = True
984
985            # Once the HF receives the supported HF indicators list from the AG,
986            # the HF shall send the AT+BIND? command to determine which HF
987            # indicators are enabled. The AG shall respond with one or more
988            # +BIND responses. The AG shall terminate the list with OK.
989            # (See Section 4.36.1.3).
990            responses = await self.execute_command(
991                "AT+BIND?", response_type=AtResponseType.MULTIPLE
992            )
993
994            logger.info("enabled HF indicators:")
995            for response in responses:
996                indicator = HfIndicator(int(response.parameters[0]))
997                enabled = int(response.parameters[1]) != 0
998                logger.info(f"  - {indicator.name}: {enabled}")
999                if indicator in self.hf_indicators:
1000                    self.hf_indicators[indicator].enabled = True
1001
1002        logger.info("SLC setup completed")
1003        self._slc_initialized = True
1004
1005    async def setup_audio_connection(self):
1006        """4.11.2 Audio Connection Setup by HF."""
1007
1008        # When the HF triggers the establishment of the Codec Connection it
1009        # shall send the AT command AT+BCC to the AG. The AG shall respond with
1010        # OK if it will start the Codec Connection procedure, and with ERROR
1011        # if it cannot start the Codec Connection procedure.
1012        await self.execute_command("AT+BCC")
1013
1014    async def setup_codec_connection(self, codec_id: int):
1015        """4.11.3 Codec Connection Setup."""
1016        # The AG shall send a +BCS=<Codec ID> unsolicited response to the HF.
1017        # The HF shall then respond to the incoming unsolicited response with
1018        # the AT command AT+BCS=<Codec ID>. The ID shall be the same as in the
1019        # unsolicited response code as long as the ID is supported.
1020        # If the received ID is not available, the HF shall respond with
1021        # AT+BAC with its available codecs.
1022        if codec_id not in self.supported_audio_codecs:
1023            codecs = [str(c) for c in self.supported_audio_codecs]
1024            await self.execute_command(f"AT+BAC={','.join(codecs)}")
1025            return
1026
1027        await self.execute_command(f"AT+BCS={codec_id}")
1028
1029        # After sending the OK response, the AG shall open the
1030        # Synchronous Connection with the settings that are determined by the
1031        # ID. The HF shall be ready to accept the synchronous connection
1032        # establishment as soon as it has sent the AT commands AT+BCS=<Codec ID>.
1033        self.active_codec = AudioCodec(codec_id)
1034        self.emit('codec_negotiation', self.active_codec)
1035
1036        logger.info("codec connection setup completed")
1037
1038    async def answer_incoming_call(self):
1039        """4.13.1 Answer Incoming Call from the HF - In-Band Ringing."""
1040        # The user accepts the incoming voice call by using the proper means
1041        # provided by the HF. The HF shall then send the ATA command
1042        # (see Section 4.34) to the AG. The AG shall then begin the procedure for
1043        # accepting the incoming call.
1044        await self.execute_command("ATA")
1045
1046    async def reject_incoming_call(self):
1047        """4.14.1 Reject an Incoming Call from the HF."""
1048        # The user rejects the incoming call by using the User Interface on the
1049        # Hands-Free unit. The HF shall then send the AT+CHUP command
1050        # (see Section 4.34) to the AG. This may happen at any time during the
1051        # procedures described in Sections 4.13.1 and 4.13.2.
1052        await self.execute_command("AT+CHUP")
1053
1054    async def terminate_call(self):
1055        """4.15.1 Terminate a Call Process from the HF."""
1056        # The user may abort the ongoing call process using whatever means
1057        # provided by the Hands-Free unit. The HF shall send AT+CHUP command
1058        # (see Section 4.34) to the AG, and the AG shall then start the
1059        # procedure to terminate or interrupt the current call procedure.
1060        # The AG shall then send the OK indication followed by the +CIEV result
1061        # code, with the value indicating (call=0).
1062        await self.execute_command("AT+CHUP")
1063
1064    async def query_current_calls(self) -> List[CallInfo]:
1065        """4.32.1 Query List of Current Calls in AG.
1066
1067        Return:
1068            List of current calls in AG.
1069        """
1070        responses = await self.execute_command(
1071            "AT+CLCC", response_type=AtResponseType.MULTIPLE
1072        )
1073        assert isinstance(responses, list)
1074
1075        calls = []
1076        for response in responses:
1077            call_info = CallInfo(
1078                index=int(response.parameters[0]),
1079                direction=CallInfoDirection(int(response.parameters[1])),
1080                status=CallInfoStatus(int(response.parameters[2])),
1081                mode=CallInfoMode(int(response.parameters[3])),
1082                multi_party=CallInfoMultiParty(int(response.parameters[4])),
1083            )
1084            if len(response.parameters) >= 6:
1085                call_info.number = response.parameters[5].decode()
1086            if len(response.parameters) >= 7:
1087                call_info.type = int(response.parameters[6])
1088            calls.append(call_info)
1089        return calls
1090
1091    async def update_ag_indicator(self, index: int, value: int):
1092        # CIEV is in 1-index, while ag_indicators is in 0-index.
1093        ag_indicator = self.ag_indicators[index - 1]
1094        ag_indicator.current_status = value
1095        self.emit('ag_indicator', ag_indicator)
1096        logger.info(f"AG indicator updated: {ag_indicator.indicator}, {value}")
1097
1098    async def handle_unsolicited(self):
1099        """Handle unsolicited result codes sent by the audio gateway."""
1100        result = await self.unsolicited_queue.get()
1101        if not result:
1102            raise HfProtocol.HfLoopTermination()
1103        if result.code == "+BCS":
1104            await self.setup_codec_connection(int(result.parameters[0]))
1105        elif result.code == "+CIEV":
1106            await self.update_ag_indicator(
1107                int(result.parameters[0]), int(result.parameters[1])
1108            )
1109        elif result.code == "+VGS":
1110            self.emit('speaker_volume', int(result.parameters[0]))
1111        elif result.code == "+VGM":
1112            self.emit('microphone_volume', int(result.parameters[0]))
1113        elif result.code == "RING":
1114            self.emit('ring')
1115        elif result.code == "+CLIP":
1116            self.emit(
1117                'cli_notification', CallLineIdentification.parse_from(result.parameters)
1118            )
1119        elif result.code == "+BVRA":
1120            # TODO: Support Enhanced Voice Recognition.
1121            self.emit(
1122                'voice_recognition', VoiceRecognitionState(int(result.parameters[0]))
1123            )
1124        else:
1125            logging.info(f"unhandled unsolicited response {result.code}")
1126
1127    async def run(self):
1128        """
1129        Main routine for the Hands-Free side of the HFP protocol.
1130
1131        Initiates the service level connection then loops handling unsolicited AG responses.
1132        """
1133
1134        try:
1135            if not self._slc_initialized:
1136                await self.initiate_slc()
1137            while True:
1138                await self.handle_unsolicited()
1139        except HfProtocol.HfLoopTermination:
1140            logger.info('Loop terminated')
1141        except Exception:
1142            logger.error("HFP-HF protocol failed with the following error:")
1143            logger.error(traceback.format_exc())
1144
1145
1146class AgProtocol(pyee.EventEmitter):
1147    """
1148    Implementation for the Audio-Gateway side of the Hands-Free profile.
1149
1150    Reference specification Hands-Free Profile v1.8.
1151
1152    Emitted events:
1153        slc_complete: Emit when SLC procedure is completed.
1154        codec_negotiation: When codec is renegotiated, notify the new codec.
1155            Args:
1156                active_codec: AudioCodec
1157        hf_indicator: When HF update their indicators, notify the new state.
1158            Args:
1159                hf_indicator: HfIndicatorState
1160        codec_connection_request: Emit when HF sends AT+BCC to request codec connection.
1161        answer: Emit when HF sends ATA to answer phone call.
1162        hang_up: Emit when HF sends AT+CHUP to hang up phone call.
1163        dial: Emit when HF sends ATD to dial phone call.
1164        voice_recognition: Emit when HF requests voice recognition state.
1165            Args:
1166                vrec: VoiceRecognitionState
1167        call_hold: Emit when HF requests call hold operation.
1168            Args:
1169                operation: CallHoldOperation
1170                call_index: Optional[int]
1171        speaker_volume: Emitted when AG update speaker volume autonomously.
1172            Args:
1173                volume: Int
1174        microphone_volume: Emitted when AG update microphone volume autonomously.
1175            Args:
1176                volume: Int
1177    """
1178
1179    supported_hf_features: int
1180    supported_hf_indicators: Set[HfIndicator]
1181    supported_audio_codecs: List[AudioCodec]
1182
1183    supported_ag_features: int
1184    supported_ag_call_hold_operations: List[CallHoldOperation]
1185
1186    ag_indicators: List[AgIndicatorState]
1187    hf_indicators: collections.OrderedDict[HfIndicator, HfIndicatorState]
1188
1189    dlc: rfcomm.DLC
1190
1191    read_buffer: bytearray
1192    active_codec: AudioCodec
1193    calls: List[CallInfo]
1194
1195    indicator_report_enabled: bool
1196    inband_ringtone_enabled: bool
1197    cme_error_enabled: bool
1198    cli_notification_enabled: bool
1199    call_waiting_enabled: bool
1200    _remained_slc_setup_features: Set[HfFeature]
1201
1202    def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None:
1203        super().__init__()
1204
1205        # Configure internal state.
1206        self.dlc = dlc
1207        self.read_buffer = bytearray()
1208        self.active_codec = AudioCodec.CVSD
1209        self.calls = []
1210
1211        # Build local features.
1212        self.supported_ag_features = sum(configuration.supported_ag_features)
1213        self.supported_ag_call_hold_operations = list(
1214            configuration.supported_ag_call_hold_operations
1215        )
1216        self.ag_indicators = list(configuration.supported_ag_indicators)
1217        self.supported_hf_indicators = set(configuration.supported_hf_indicators)
1218        self.inband_ringtone_enabled = True
1219        self._remained_slc_setup_features = set()
1220
1221        # Clear remote features.
1222        self.supported_hf_features = 0
1223        self.supported_audio_codecs = []
1224        self.indicator_report_enabled = False
1225        self.cme_error_enabled = False
1226        self.cli_notification_enabled = False
1227        self.call_waiting_enabled = False
1228
1229        self.hf_indicators = collections.OrderedDict()
1230
1231        # Bind the AT reader to the RFCOMM channel.
1232        self.dlc.sink = self._read_at
1233
1234    def supports_hf_feature(self, feature: HfFeature) -> bool:
1235        return (self.supported_hf_features & feature) != 0
1236
1237    def supports_ag_feature(self, feature: AgFeature) -> bool:
1238        return (self.supported_ag_features & feature) != 0
1239
1240    def _read_at(self, data: bytes):
1241        """
1242        Reads AT messages from the RFCOMM channel.
1243        """
1244        # Append to the read buffer.
1245        self.read_buffer.extend(data)
1246
1247        # Locate the trailer.
1248        trailer = self.read_buffer.find(b'\r')
1249        if trailer == -1:
1250            return
1251
1252        # Isolate the AT response code and parameters.
1253        raw_command = self.read_buffer[:trailer]
1254        command = AtCommand.parse_from(raw_command)
1255        logger.debug(f"<<< {raw_command.decode()}")
1256
1257        # Consume the response bytes.
1258        self.read_buffer = self.read_buffer[trailer + 1 :]
1259
1260        if command.sub_code == AtCommand.SubCode.TEST:
1261            handler_name = f'_on_{command.code.lower()}_test'
1262        elif command.sub_code == AtCommand.SubCode.READ:
1263            handler_name = f'_on_{command.code.lower()}_read'
1264        else:
1265            handler_name = f'_on_{command.code.lower()}'
1266
1267        if handler := getattr(self, handler_name, None):
1268            handler(*command.parameters)
1269        else:
1270            logger.warning('Handler %s not found', handler_name)
1271            self.send_response('ERROR')
1272
1273    def send_response(self, response: str) -> None:
1274        """Sends an AT response."""
1275        self.dlc.write(f'\r\n{response}\r\n')
1276
1277    def send_cme_error(self, error_code: CmeError) -> None:
1278        """Sends an CME ERROR response.
1279
1280        If CME Error is not enabled by HF, sends ERROR instead.
1281        """
1282        if self.cme_error_enabled:
1283            self.send_response(f'+CME ERROR: {error_code.value}')
1284        else:
1285            self.send_error()
1286
1287    def send_ok(self) -> None:
1288        """Sends an OK response."""
1289        self.send_response('OK')
1290
1291    def send_error(self) -> None:
1292        """Sends an ERROR response."""
1293        self.send_response('ERROR')
1294
1295    def set_inband_ringtone_enabled(self, enabled: bool) -> None:
1296        """Enables or disables in-band ringtone."""
1297
1298        self.inband_ringtone_enabled = enabled
1299        self.send_response(f'+BSIR: {1 if enabled else 0}')
1300
1301    def set_speaker_volume(self, level: int) -> None:
1302        """Reports speaker volume."""
1303
1304        self.send_response(f'+VGS: {level}')
1305
1306    def set_microphone_volume(self, level: int) -> None:
1307        """Reports microphone volume."""
1308
1309        self.send_response(f'+VGM: {level}')
1310
1311    def send_ring(self) -> None:
1312        """Sends RING command to trigger ringtone on HF."""
1313
1314        self.send_response('RING')
1315
1316    def update_ag_indicator(self, indicator: AgIndicator, value: int) -> None:
1317        """Updates AG indicator.
1318
1319        Args:
1320            indicator: Name of the indicator.
1321            value: new value of the indicator.
1322        """
1323
1324        search_result = next(
1325            (
1326                (index, state)
1327                for index, state in enumerate(self.ag_indicators)
1328                if state.indicator == indicator
1329            ),
1330            None,
1331        )
1332        if not search_result:
1333            raise KeyError(f'{indicator} is not supported.')
1334
1335        index, indicator_state = search_result
1336        if not self.indicator_report_enabled:
1337            logger.warning('AG indicator report is disabled')
1338        if not indicator_state.enabled:
1339            logger.warning(f'AG indicator {indicator} is disabled')
1340
1341        indicator_state.current_status = value
1342        self.send_response(f'+CIEV: {index+1},{value}')
1343
1344    async def negotiate_codec(self, codec: AudioCodec) -> None:
1345        """Starts codec negotiation."""
1346
1347        if not self.supports_ag_feature(AgFeature.CODEC_NEGOTIATION):
1348            logger.warning('Local does not support Codec Negotiation')
1349        if not self.supports_hf_feature(HfFeature.CODEC_NEGOTIATION):
1350            logger.warning('Peer does not support Codec Negotiation')
1351        if codec not in self.supported_audio_codecs:
1352            logger.warning(f'{codec} is not supported by peer')
1353
1354        at_bcs_future = asyncio.get_running_loop().create_future()
1355        self.once('codec_negotiation', at_bcs_future.set_result)
1356        self.send_response(f'+BCS: {codec.value}')
1357        if (new_codec := await at_bcs_future) != codec:
1358            raise HfpProtocolError(f'Expect codec: {codec}, but get {new_codec}')
1359
1360    def send_cli_notification(self, cli: CallLineIdentification) -> None:
1361        """Sends +CLIP CLI notification."""
1362
1363        if not self.cli_notification_enabled:
1364            logger.warning('Try to send CLIP while CLI notification is not enabled')
1365
1366        self.send_response(f'+CLIP: {cli.to_clip_string()}')
1367
1368    def _check_remained_slc_commands(self) -> None:
1369        if not self._remained_slc_setup_features:
1370            self.emit('slc_complete')
1371
1372    def _on_brsf(self, hf_features: bytes) -> None:
1373        self.supported_hf_features = int(hf_features)
1374        self.send_response(f'+BRSF: {self.supported_ag_features}')
1375        self.send_ok()
1376
1377        if self.supports_hf_feature(
1378            HfFeature.HF_INDICATORS
1379        ) and self.supports_ag_feature(AgFeature.HF_INDICATORS):
1380            self._remained_slc_setup_features.add(HfFeature.HF_INDICATORS)
1381
1382        if self.supports_hf_feature(
1383            HfFeature.THREE_WAY_CALLING
1384        ) and self.supports_ag_feature(AgFeature.THREE_WAY_CALLING):
1385            self._remained_slc_setup_features.add(HfFeature.THREE_WAY_CALLING)
1386
1387    def _on_bac(self, *args) -> None:
1388        self.supported_audio_codecs = [AudioCodec(int(value)) for value in args]
1389        self.send_ok()
1390
1391    def _on_bcs(self, codec: bytes) -> None:
1392        self.active_codec = AudioCodec(int(codec))
1393        self.send_ok()
1394        self.emit('codec_negotiation', self.active_codec)
1395
1396    def _on_bvra(self, vrec: bytes) -> None:
1397        self.send_ok()
1398        self.emit('voice_recognition', VoiceRecognitionState(int(vrec)))
1399
1400    def _on_chld(self, operation_code: bytes) -> None:
1401        call_index: Optional[int] = None
1402        if len(operation_code) > 1:
1403            call_index = int(operation_code[1:])
1404            operation_code = operation_code[:1] + b'x'
1405        try:
1406            operation = CallHoldOperation(operation_code.decode())
1407        except:
1408            logger.error(f'Invalid operation: {operation_code.decode()}')
1409            self.send_cme_error(CmeError.OPERATION_NOT_SUPPORTED)
1410            return
1411
1412        if operation not in self.supported_ag_call_hold_operations:
1413            logger.error(f'Unsupported operation: {operation_code.decode()}')
1414            self.send_cme_error(CmeError.OPERATION_NOT_SUPPORTED)
1415
1416        if call_index is not None and not any(
1417            call.index == call_index for call in self.calls
1418        ):
1419            logger.error(f'No matching call {call_index}')
1420            self.send_cme_error(CmeError.INVALID_INDEX)
1421
1422        # Real three-way calls have more complicated situations, but this is not a popular issue - let users to handle the remaining :)
1423
1424        self.send_ok()
1425        self.emit('call_hold', operation, call_index)
1426
1427    def _on_chld_test(self) -> None:
1428        if not self.supports_ag_feature(AgFeature.THREE_WAY_CALLING):
1429            self.send_error()
1430            return
1431
1432        self.send_response(
1433            '+CHLD: ({})'.format(
1434                ','.join(
1435                    operation.value
1436                    for operation in self.supported_ag_call_hold_operations
1437                )
1438            )
1439        )
1440        self.send_ok()
1441        self._remained_slc_setup_features.remove(HfFeature.THREE_WAY_CALLING)
1442        self._check_remained_slc_commands()
1443
1444    def _on_cind_test(self) -> None:
1445        if not self.ag_indicators:
1446            self.send_cme_error(CmeError.NOT_FOUND)
1447            return
1448
1449        indicator_list_str = ",".join(
1450            indicator.on_test_text for indicator in self.ag_indicators
1451        )
1452        self.send_response(f'+CIND: {indicator_list_str}')
1453        self.send_ok()
1454
1455    def _on_cind_read(self) -> None:
1456        if not self.ag_indicators:
1457            self.send_cme_error(CmeError.NOT_FOUND)
1458            return
1459
1460        indicator_list_str = ",".join(
1461            str(indicator.current_status) for indicator in self.ag_indicators
1462        )
1463        self.send_response(f'+CIND: {indicator_list_str}')
1464        self.send_ok()
1465
1466        self._check_remained_slc_commands()
1467
1468    def _on_cmer(
1469        self,
1470        mode: bytes,
1471        keypad: Optional[bytes] = None,
1472        display: Optional[bytes] = None,
1473        indicator: bytes = b'',
1474    ) -> None:
1475        if (
1476            int(mode) != 3
1477            or (keypad and int(keypad))
1478            or (display and int(display))
1479            or int(indicator) not in (0, 1)
1480        ):
1481            logger.error(
1482                f'Unexpected values: mode={mode!r}, keypad={keypad!r}, '
1483                f'display={display!r}, indicator={indicator!r}'
1484            )
1485            self.send_cme_error(CmeError.INVALID_INDEX)
1486
1487        self.indicator_report_enabled = bool(int(indicator))
1488        self.send_ok()
1489
1490    def _on_cmee(self, enabled: bytes) -> None:
1491        self.cme_error_enabled = bool(int(enabled))
1492        self.send_ok()
1493
1494    def _on_ccwa(self, enabled: bytes) -> None:
1495        self.call_waiting_enabled = bool(int(enabled))
1496        self.send_ok()
1497
1498    def _on_bind(self, *args) -> None:
1499        if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
1500            self.send_error()
1501            return
1502
1503        peer_supported_indicators = set(
1504            HfIndicator(int(indicator)) for indicator in args
1505        )
1506        self.hf_indicators = collections.OrderedDict(
1507            {
1508                indicator: HfIndicatorState(indicator=indicator)
1509                for indicator in self.supported_hf_indicators.intersection(
1510                    peer_supported_indicators
1511                )
1512            }
1513        )
1514        self.send_ok()
1515
1516    def _on_bind_test(self) -> None:
1517        if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
1518            self.send_error()
1519            return
1520
1521        hf_indicator_list_str = ",".join(
1522            str(indicator.value) for indicator in self.supported_hf_indicators
1523        )
1524        self.send_response(f'+BIND: ({hf_indicator_list_str})')
1525        self.send_ok()
1526
1527    def _on_bind_read(self) -> None:
1528        if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
1529            self.send_error()
1530            return
1531
1532        for indicator in self.hf_indicators:
1533            self.send_response(f'+BIND: {indicator.value},1')
1534
1535        self.send_ok()
1536
1537        self._remained_slc_setup_features.remove(HfFeature.HF_INDICATORS)
1538        self._check_remained_slc_commands()
1539
1540    def _on_biev(self, index_bytes: bytes, value_bytes: bytes) -> None:
1541        if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
1542            self.send_error()
1543            return
1544
1545        index = HfIndicator(int(index_bytes))
1546        if index not in self.hf_indicators:
1547            self.send_error()
1548            return
1549
1550        self.hf_indicators[index].current_status = int(value_bytes)
1551        self.emit('hf_indicator', self.hf_indicators[index])
1552        self.send_ok()
1553
1554    def _on_bia(self, *args) -> None:
1555        for enabled, state in zip(args, self.ag_indicators):
1556            state.enabled = bool(int(enabled))
1557        self.send_ok()
1558
1559    def _on_bcc(self) -> None:
1560        self.emit('codec_connection_request')
1561        self.send_ok()
1562
1563    def _on_a(self) -> None:
1564        """ATA handler."""
1565        self.emit('answer')
1566        self.send_ok()
1567
1568    def _on_d(self, number: bytes) -> None:
1569        """ATD handler."""
1570        self.emit('dial', number.decode())
1571        self.send_ok()
1572
1573    def _on_chup(self) -> None:
1574        self.emit('hang_up')
1575        self.send_ok()
1576
1577    def _on_clcc(self) -> None:
1578        for call in self.calls:
1579            number_text = f',\"{call.number}\"' if call.number is not None else ''
1580            type_text = f',{call.type}' if call.type is not None else ''
1581            response = (
1582                f'+CLCC: {call.index}'
1583                f',{call.direction.value}'
1584                f',{call.status.value}'
1585                f',{call.mode.value}'
1586                f',{call.multi_party.value}'
1587                f'{number_text}'
1588                f'{type_text}'
1589            )
1590            self.send_response(response)
1591        self.send_ok()
1592
1593    def _on_clip(self, enabled: bytes) -> None:
1594        if not self.supports_hf_feature(HfFeature.CLI_PRESENTATION_CAPABILITY):
1595            logger.error('Remote doesn not support CLI but sends AT+CLIP')
1596        self.cli_notification_enabled = True if enabled == b'1' else False
1597        self.send_ok()
1598
1599    def _on_vgs(self, level: bytes) -> None:
1600        self.emit('speaker_volume', int(level))
1601        self.send_ok()
1602
1603    def _on_vgm(self, level: bytes) -> None:
1604        self.emit('microphone_volume', int(level))
1605        self.send_ok()
1606
1607
1608# -----------------------------------------------------------------------------
1609# Normative SDP definitions
1610# -----------------------------------------------------------------------------
1611
1612
1613class ProfileVersion(enum.IntEnum):
1614    """
1615    Profile version (normative).
1616
1617    Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements.
1618    """
1619
1620    V1_5 = 0x0105
1621    V1_6 = 0x0106
1622    V1_7 = 0x0107
1623    V1_8 = 0x0108
1624    V1_9 = 0x0109
1625
1626
1627class HfSdpFeature(enum.IntFlag):
1628    """
1629    HF supported features (normative).
1630
1631    Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements.
1632    """
1633
1634    EC_NR = 0x01  # Echo Cancel & Noise reduction
1635    THREE_WAY_CALLING = 0x02
1636    CLI_PRESENTATION_CAPABILITY = 0x04
1637    VOICE_RECOGNITION_ACTIVATION = 0x08
1638    REMOTE_VOLUME_CONTROL = 0x10
1639    WIDE_BAND = 0x20  # Wide band speech
1640    ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
1641    VOICE_RECOGNITION_TEST = 0x80
1642
1643
1644class AgSdpFeature(enum.IntFlag):
1645    """
1646    AG supported features (normative).
1647
1648    Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements.
1649    """
1650
1651    THREE_WAY_CALLING = 0x01
1652    EC_NR = 0x02  # Echo Cancel & Noise reduction
1653    VOICE_RECOGNITION_FUNCTION = 0x04
1654    IN_BAND_RING_TONE_CAPABILITY = 0x08
1655    VOICE_TAG = 0x10  # Attach a number to voice tag
1656    WIDE_BAND = 0x20  # Wide band speech
1657    ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
1658    VOICE_RECOGNITION_TEST = 0x80
1659
1660
1661def make_hf_sdp_records(
1662    service_record_handle: int,
1663    rfcomm_channel: int,
1664    configuration: HfConfiguration,
1665    version: ProfileVersion = ProfileVersion.V1_8,
1666) -> List[sdp.ServiceAttribute]:
1667    """
1668    Generates the SDP record for HFP Hands-Free support.
1669
1670    The record exposes the features supported in the input configuration,
1671    and the allocated RFCOMM channel.
1672    """
1673
1674    hf_supported_features = 0
1675
1676    if HfFeature.EC_NR in configuration.supported_hf_features:
1677        hf_supported_features |= HfSdpFeature.EC_NR
1678    if HfFeature.THREE_WAY_CALLING in configuration.supported_hf_features:
1679        hf_supported_features |= HfSdpFeature.THREE_WAY_CALLING
1680    if HfFeature.CLI_PRESENTATION_CAPABILITY in configuration.supported_hf_features:
1681        hf_supported_features |= HfSdpFeature.CLI_PRESENTATION_CAPABILITY
1682    if HfFeature.VOICE_RECOGNITION_ACTIVATION in configuration.supported_hf_features:
1683        hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_ACTIVATION
1684    if HfFeature.REMOTE_VOLUME_CONTROL in configuration.supported_hf_features:
1685        hf_supported_features |= HfSdpFeature.REMOTE_VOLUME_CONTROL
1686    if (
1687        HfFeature.ENHANCED_VOICE_RECOGNITION_STATUS
1688        in configuration.supported_hf_features
1689    ):
1690        hf_supported_features |= HfSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
1691    if HfFeature.VOICE_RECOGNITION_TEST in configuration.supported_hf_features:
1692        hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEST
1693
1694    if AudioCodec.MSBC in configuration.supported_audio_codecs:
1695        hf_supported_features |= HfSdpFeature.WIDE_BAND
1696
1697    return [
1698        sdp.ServiceAttribute(
1699            sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
1700            sdp.DataElement.unsigned_integer_32(service_record_handle),
1701        ),
1702        sdp.ServiceAttribute(
1703            sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
1704            sdp.DataElement.sequence(
1705                [
1706                    sdp.DataElement.uuid(BT_HANDSFREE_SERVICE),
1707                    sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
1708                ]
1709            ),
1710        ),
1711        sdp.ServiceAttribute(
1712            sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
1713            sdp.DataElement.sequence(
1714                [
1715                    sdp.DataElement.sequence(
1716                        [sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]
1717                    ),
1718                    sdp.DataElement.sequence(
1719                        [
1720                            sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
1721                            sdp.DataElement.unsigned_integer_8(rfcomm_channel),
1722                        ]
1723                    ),
1724                ]
1725            ),
1726        ),
1727        sdp.ServiceAttribute(
1728            sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
1729            sdp.DataElement.sequence(
1730                [
1731                    sdp.DataElement.sequence(
1732                        [
1733                            sdp.DataElement.uuid(BT_HANDSFREE_SERVICE),
1734                            sdp.DataElement.unsigned_integer_16(version),
1735                        ]
1736                    )
1737                ]
1738            ),
1739        ),
1740        sdp.ServiceAttribute(
1741            sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
1742            sdp.DataElement.unsigned_integer_16(hf_supported_features),
1743        ),
1744    ]
1745
1746
1747def make_ag_sdp_records(
1748    service_record_handle: int,
1749    rfcomm_channel: int,
1750    configuration: AgConfiguration,
1751    version: ProfileVersion = ProfileVersion.V1_8,
1752) -> List[sdp.ServiceAttribute]:
1753    """
1754    Generates the SDP record for HFP Audio-Gateway support.
1755
1756    The record exposes the features supported in the input configuration,
1757    and the allocated RFCOMM channel.
1758    """
1759
1760    ag_supported_features = 0
1761
1762    if AgFeature.EC_NR in configuration.supported_ag_features:
1763        ag_supported_features |= AgSdpFeature.EC_NR
1764    if AgFeature.THREE_WAY_CALLING in configuration.supported_ag_features:
1765        ag_supported_features |= AgSdpFeature.THREE_WAY_CALLING
1766    if (
1767        AgFeature.ENHANCED_VOICE_RECOGNITION_STATUS
1768        in configuration.supported_ag_features
1769    ):
1770        ag_supported_features |= AgSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
1771    if AgFeature.VOICE_RECOGNITION_TEST in configuration.supported_ag_features:
1772        ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEST
1773    if AgFeature.IN_BAND_RING_TONE_CAPABILITY in configuration.supported_ag_features:
1774        ag_supported_features |= AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
1775    if AgFeature.VOICE_RECOGNITION_FUNCTION in configuration.supported_ag_features:
1776        ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_FUNCTION
1777    if AudioCodec.MSBC in configuration.supported_audio_codecs:
1778        ag_supported_features |= AgSdpFeature.WIDE_BAND
1779
1780    return [
1781        sdp.ServiceAttribute(
1782            sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
1783            sdp.DataElement.unsigned_integer_32(service_record_handle),
1784        ),
1785        sdp.ServiceAttribute(
1786            sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
1787            sdp.DataElement.sequence(
1788                [
1789                    sdp.DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE),
1790                    sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
1791                ]
1792            ),
1793        ),
1794        sdp.ServiceAttribute(
1795            sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
1796            sdp.DataElement.sequence(
1797                [
1798                    sdp.DataElement.sequence(
1799                        [sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]
1800                    ),
1801                    sdp.DataElement.sequence(
1802                        [
1803                            sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
1804                            sdp.DataElement.unsigned_integer_8(rfcomm_channel),
1805                        ]
1806                    ),
1807                ]
1808            ),
1809        ),
1810        sdp.ServiceAttribute(
1811            sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
1812            sdp.DataElement.sequence(
1813                [
1814                    sdp.DataElement.sequence(
1815                        [
1816                            sdp.DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE),
1817                            sdp.DataElement.unsigned_integer_16(version),
1818                        ]
1819                    )
1820                ]
1821            ),
1822        ),
1823        sdp.ServiceAttribute(
1824            sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
1825            sdp.DataElement.unsigned_integer_16(ag_supported_features),
1826        ),
1827    ]
1828
1829
1830async def find_hf_sdp_record(
1831    connection: device.Connection,
1832) -> Optional[Tuple[int, ProfileVersion, HfSdpFeature]]:
1833    """Searches a Hands-Free SDP record from remote device.
1834
1835    Args:
1836        connection: ACL connection to make SDP search.
1837
1838    Returns:
1839        Tuple of (<RFCOMM channel>, <Profile Version>, <HF SDP features>)
1840    """
1841    async with sdp.Client(connection) as sdp_client:
1842        search_result = await sdp_client.search_attributes(
1843            uuids=[BT_HANDSFREE_SERVICE],
1844            attribute_ids=[
1845                sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
1846                sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
1847                sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
1848                sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
1849            ],
1850        )
1851        for attribute_lists in search_result:
1852            channel: Optional[int] = None
1853            version: Optional[ProfileVersion] = None
1854            features: Optional[HfSdpFeature] = None
1855            for attribute in attribute_lists:
1856                # The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]].
1857                if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
1858                    protocol_descriptor_list = attribute.value.value
1859                    channel = protocol_descriptor_list[1].value[1].value
1860                elif (
1861                    attribute.id
1862                    == sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
1863                ):
1864                    profile_descriptor_list = attribute.value.value
1865                    version = ProfileVersion(profile_descriptor_list[0].value[1].value)
1866                elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID:
1867                    features = HfSdpFeature(attribute.value.value)
1868                elif attribute.id == sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID:
1869                    class_id_list = attribute.value.value
1870                    uuid = class_id_list[0].value
1871                    # AG record may also contain HF UUID in its profile descriptor list.
1872                    # If found, skip this record.
1873                    if uuid == BT_HANDSFREE_AUDIO_GATEWAY_SERVICE:
1874                        channel, version, features = (None, None, None)
1875                        break
1876
1877            if channel is not None and version is not None and features is not None:
1878                return (channel, version, features)
1879    return None
1880
1881
1882async def find_ag_sdp_record(
1883    connection: device.Connection,
1884) -> Optional[Tuple[int, ProfileVersion, AgSdpFeature]]:
1885    """Searches an Audio-Gateway SDP record from remote device.
1886
1887    Args:
1888        connection: ACL connection to make SDP search.
1889
1890    Returns:
1891        Tuple of (<RFCOMM channel>, <Profile Version>, <AG SDP features>)
1892    """
1893    async with sdp.Client(connection) as sdp_client:
1894        search_result = await sdp_client.search_attributes(
1895            uuids=[BT_HANDSFREE_AUDIO_GATEWAY_SERVICE],
1896            attribute_ids=[
1897                sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
1898                sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
1899                sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
1900            ],
1901        )
1902        for attribute_lists in search_result:
1903            channel: Optional[int] = None
1904            version: Optional[ProfileVersion] = None
1905            features: Optional[AgSdpFeature] = None
1906            for attribute in attribute_lists:
1907                # The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]].
1908                if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
1909                    protocol_descriptor_list = attribute.value.value
1910                    channel = protocol_descriptor_list[1].value[1].value
1911                elif (
1912                    attribute.id
1913                    == sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
1914                ):
1915                    profile_descriptor_list = attribute.value.value
1916                    version = ProfileVersion(profile_descriptor_list[0].value[1].value)
1917                elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID:
1918                    features = AgSdpFeature(attribute.value.value)
1919            if not channel or not version or features is None:
1920                logger.warning(f"Bad result {attribute_lists}.")
1921                return None
1922            return (channel, version, features)
1923    return None
1924
1925
1926# -----------------------------------------------------------------------------
1927# ESCO Codec Default Parameters
1928# -----------------------------------------------------------------------------
1929
1930
1931# Hands-Free Profile v1.8, 5.7 Codec Interoperability Requirements
1932class DefaultCodecParameters(enum.IntEnum):
1933    SCO_CVSD_D0 = enum.auto()
1934    SCO_CVSD_D1 = enum.auto()
1935    ESCO_CVSD_S1 = enum.auto()
1936    ESCO_CVSD_S2 = enum.auto()
1937    ESCO_CVSD_S3 = enum.auto()
1938    ESCO_CVSD_S4 = enum.auto()
1939    ESCO_MSBC_T1 = enum.auto()
1940    ESCO_MSBC_T2 = enum.auto()
1941
1942
1943@dataclasses.dataclass
1944class EscoParameters:
1945    # Codec specific
1946    transmit_coding_format: CodingFormat
1947    receive_coding_format: CodingFormat
1948    packet_type: HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType
1949    retransmission_effort: (
1950        HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort
1951    )
1952    max_latency: int
1953
1954    # Common
1955    input_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM)
1956    output_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM)
1957    input_coded_data_size: int = 16
1958    output_coded_data_size: int = 16
1959    input_pcm_data_format: (
1960        HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat
1961    ) = HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT
1962    output_pcm_data_format: (
1963        HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat
1964    ) = HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT
1965    input_pcm_sample_payload_msb_position: int = 0
1966    output_pcm_sample_payload_msb_position: int = 0
1967    input_data_path: HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath = (
1968        HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath.HCI
1969    )
1970    output_data_path: HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath = (
1971        HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath.HCI
1972    )
1973    input_transport_unit_size: int = 0
1974    output_transport_unit_size: int = 0
1975    input_bandwidth: int = 16000
1976    output_bandwidth: int = 16000
1977    transmit_bandwidth: int = 8000
1978    receive_bandwidth: int = 8000
1979    transmit_codec_frame_size: int = 60
1980    receive_codec_frame_size: int = 60
1981
1982    def asdict(self) -> Dict[str, Any]:
1983        # dataclasses.asdict() will recursively deep-copy the entire object,
1984        # which is expensive and breaks CodingFormat object, so let it simply copy here.
1985        return self.__dict__
1986
1987
1988_ESCO_PARAMETERS_CVSD_D0 = EscoParameters(
1989    transmit_coding_format=CodingFormat(CodecID.CVSD),
1990    receive_coding_format=CodingFormat(CodecID.CVSD),
1991    max_latency=0xFFFF,
1992    packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV1,
1993    retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION,
1994)
1995
1996_ESCO_PARAMETERS_CVSD_D1 = EscoParameters(
1997    transmit_coding_format=CodingFormat(CodecID.CVSD),
1998    receive_coding_format=CodingFormat(CodecID.CVSD),
1999    max_latency=0xFFFF,
2000    packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV3,
2001    retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION,
2002)
2003
2004_ESCO_PARAMETERS_CVSD_S1 = EscoParameters(
2005    transmit_coding_format=CodingFormat(CodecID.CVSD),
2006    receive_coding_format=CodingFormat(CodecID.CVSD),
2007    max_latency=0x0007,
2008    packet_type=(
2009        HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
2010        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV3
2011        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
2012        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
2013        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
2014    ),
2015    retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_POWER,
2016)
2017
2018_ESCO_PARAMETERS_CVSD_S2 = EscoParameters(
2019    transmit_coding_format=CodingFormat(CodecID.CVSD),
2020    receive_coding_format=CodingFormat(CodecID.CVSD),
2021    max_latency=0x0007,
2022    packet_type=(
2023        HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
2024        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
2025        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
2026        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
2027    ),
2028    retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_POWER,
2029)
2030
2031_ESCO_PARAMETERS_CVSD_S3 = EscoParameters(
2032    transmit_coding_format=CodingFormat(CodecID.CVSD),
2033    receive_coding_format=CodingFormat(CodecID.CVSD),
2034    max_latency=0x000A,
2035    packet_type=(
2036        HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
2037        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
2038        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
2039        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
2040    ),
2041    retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_POWER,
2042)
2043
2044_ESCO_PARAMETERS_CVSD_S4 = EscoParameters(
2045    transmit_coding_format=CodingFormat(CodecID.CVSD),
2046    receive_coding_format=CodingFormat(CodecID.CVSD),
2047    max_latency=0x000C,
2048    packet_type=(
2049        HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
2050        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
2051        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
2052        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
2053    ),
2054    retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
2055)
2056
2057_ESCO_PARAMETERS_MSBC_T1 = EscoParameters(
2058    transmit_coding_format=CodingFormat(CodecID.MSBC),
2059    receive_coding_format=CodingFormat(CodecID.MSBC),
2060    max_latency=0x0008,
2061    packet_type=(
2062        HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
2063        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
2064        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
2065        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
2066    ),
2067    input_bandwidth=32000,
2068    output_bandwidth=32000,
2069    retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
2070)
2071
2072_ESCO_PARAMETERS_MSBC_T2 = EscoParameters(
2073    transmit_coding_format=CodingFormat(CodecID.MSBC),
2074    receive_coding_format=CodingFormat(CodecID.MSBC),
2075    max_latency=0x000D,
2076    packet_type=(
2077        HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
2078        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV3
2079        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
2080        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
2081        | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
2082    ),
2083    input_bandwidth=32000,
2084    output_bandwidth=32000,
2085    retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
2086)
2087
2088ESCO_PARAMETERS = {
2089    DefaultCodecParameters.SCO_CVSD_D0: _ESCO_PARAMETERS_CVSD_D0,
2090    DefaultCodecParameters.SCO_CVSD_D1: _ESCO_PARAMETERS_CVSD_D1,
2091    DefaultCodecParameters.ESCO_CVSD_S1: _ESCO_PARAMETERS_CVSD_S1,
2092    DefaultCodecParameters.ESCO_CVSD_S2: _ESCO_PARAMETERS_CVSD_S2,
2093    DefaultCodecParameters.ESCO_CVSD_S3: _ESCO_PARAMETERS_CVSD_S3,
2094    DefaultCodecParameters.ESCO_CVSD_S4: _ESCO_PARAMETERS_CVSD_S4,
2095    DefaultCodecParameters.ESCO_MSBC_T1: _ESCO_PARAMETERS_MSBC_T1,
2096    DefaultCodecParameters.ESCO_MSBC_T2: _ESCO_PARAMETERS_MSBC_T2,
2097}
2098