1 /*
2 * Copyright 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "mmc/daemon/service.h"
18
19 #include <base/functional/bind.h>
20 #include <base/functional/callback_helpers.h>
21 #include <base/stl_util.h>
22 #include <base/task/single_thread_task_runner.h>
23 #include <base/unguessable_token.h>
24 #include <bluetooth/log.h>
25 #include <poll.h>
26 #include <sys/socket.h>
27 #include <sys/stat.h>
28 #include <sys/un.h>
29 #include <unistd.h>
30
31 #include <cerrno>
32 #include <cstring>
33 #include <future>
34
35 #include "common/message_loop_thread.h"
36 #include "mmc/codec_server/hfp_lc3_mmc_decoder.h"
37 #include "mmc/codec_server/hfp_lc3_mmc_encoder.h"
38 #include "mmc/daemon/constants.h"
39 #include "mmc/mmc_interface/mmc_interface.h"
40 #include "mmc/proto/mmc_service.pb.h"
41
42 #if !defined(EXCLUDE_NONSTANDARD_CODECS)
43 #include "mmc/codec_server/a2dp_aac_mmc_encoder.h"
44 #endif
45
46 namespace mmc {
47 namespace {
48
49 using namespace bluetooth;
50
51 // Task that would run on the thread.
StartSocketListener(int fd,struct sockaddr_un addr,std::promise<void> task_ended,std::unique_ptr<MmcInterface> codec_server)52 void StartSocketListener(int fd, struct sockaddr_un addr, std::promise<void> task_ended,
53 std::unique_ptr<MmcInterface> codec_server) {
54 socklen_t addr_size = sizeof(struct sockaddr_un);
55 int client_fd = accept(fd, (struct sockaddr*)&addr, &addr_size);
56 // |fd| is only used for accept.
57 close(fd);
58
59 if (client_fd < 0) {
60 log::error("Failed to accept: {}", strerror(errno));
61 codec_server.release();
62 task_ended.set_value();
63 return;
64 }
65
66 std::array<uint8_t, kMaximumBufferSize> i_buf = {};
67 std::array<uint8_t, kMaximumBufferSize> o_buf = {};
68
69 struct pollfd pfd;
70 pfd.fd = client_fd;
71 pfd.events = POLLIN;
72
73 while (1) {
74 // Blocking poll.
75 int poll_ret = poll(&pfd, 1, -1);
76 if (poll_ret <= 0) {
77 log::error("Poll failed: {}", strerror(errno));
78 break;
79 }
80
81 // Ignore remaining data in the closed socket.
82 if (pfd.revents & (POLLHUP | POLLNVAL)) {
83 log::info("Socket disconnected");
84 break;
85 }
86
87 int i_data_len = recv(client_fd, i_buf.data(), kMaximumBufferSize, MSG_NOSIGNAL);
88 if (i_data_len <= 0) {
89 log::error("Failed to recv data: {}", strerror(errno));
90 break;
91 }
92
93 // Start transcode.
94 int o_data_len =
95 codec_server->transcode(i_buf.data(), i_data_len, o_buf.data(), kMaximumBufferSize);
96 if (o_data_len < 0) {
97 log::error("Failed to transcode: {}", strerror(-o_data_len));
98 break;
99 }
100
101 int sent_rc = send(client_fd, o_buf.data(), o_data_len, MSG_NOSIGNAL);
102 if (sent_rc <= 0) {
103 log::error("Failed to send data: {}", strerror(errno));
104 break;
105 }
106 o_buf.fill(0);
107 }
108 close(client_fd);
109 unlink(addr.sun_path);
110 codec_server.release();
111 task_ended.set_value();
112 return;
113 }
114
115 } // namespace
116
Service(base::OnceClosure shutdown_callback)117 Service::Service(base::OnceClosure shutdown_callback)
118 : shutdown_callback_(std::move(shutdown_callback)), weak_ptr_factory_(this) {}
119
Init()120 bool Service::Init() {
121 // Set up the dbus service.
122 dbus::Bus::Options opts;
123 opts.bus_type = dbus::Bus::SYSTEM;
124 bus_ = new dbus::Bus(std::move(opts));
125
126 if (!bus_->Connect()) {
127 log::error("Failed to connect to system bus");
128 return false;
129 }
130
131 exported_object_ = bus_->GetExportedObject(dbus::ObjectPath(kMmcServicePath));
132 if (!exported_object_) {
133 log::error("Failed to export {} object", kMmcServicePath);
134 return false;
135 }
136
137 using ServiceMethod = void (Service::*)(dbus::MethodCall*, dbus::ExportedObject::ResponseSender);
138 const std::map<const char*, ServiceMethod> kServiceMethods = {
139 {kCodecInitMethod, &Service::CodecInit},
140 {kCodecCleanUpMethod, &Service::CodecCleanUp},
141 };
142
143 for (const auto& iter : kServiceMethods) {
144 bool ret = exported_object_->ExportMethodAndBlock(
145 kMmcServiceInterface, iter.first,
146 base::BindRepeating(iter.second, weak_ptr_factory_.GetWeakPtr()));
147 if (!ret) {
148 log::error("Failed to export method: {}", iter.first);
149 return false;
150 }
151 }
152
153 if (!bus_->RequestOwnershipAndBlock(kMmcServiceName, dbus::Bus::REQUIRE_PRIMARY)) {
154 log::error("Failed to take ownership of {}", kMmcServiceName);
155 return false;
156 }
157 return true;
158 }
159
CodecInit(dbus::MethodCall * method_call,dbus::ExportedObject::ResponseSender sender)160 void Service::CodecInit(dbus::MethodCall* method_call,
161 dbus::ExportedObject::ResponseSender sender) {
162 dbus::MessageReader reader(method_call);
163 auto dbus_response = dbus::Response::FromMethodCall(method_call);
164
165 dbus::MessageWriter writer(dbus_response.get());
166
167 CodecInitRequest request;
168 CodecInitResponse response;
169
170 if (!reader.PopArrayOfBytesAsProto(&request)) {
171 std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(
172 method_call, kMmcServiceError, "Unable to parse CodecInitRequest from message"));
173 return;
174 }
175
176 if (!request.has_config()) {
177 std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(method_call, kMmcServiceError,
178 "'Config Param' must be set"));
179 return;
180 }
181
182 // Create codec server instance.
183 std::unique_ptr<MmcInterface> codec_server;
184 if (request.config().has_hfp_lc3_decoder_param()) {
185 codec_server = std::make_unique<HfpLc3Decoder>();
186 } else if (request.config().has_hfp_lc3_encoder_param()) {
187 codec_server = std::make_unique<HfpLc3Encoder>();
188 }
189 #if !defined(EXCLUDE_NONSTANDARD_CODECS)
190 else if (request.config().has_a2dp_aac_encoder_param()) {
191 codec_server = std::make_unique<A2dpAacEncoder>();
192 }
193 #endif
194 else {
195 std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(method_call, kMmcServiceError,
196 "Codec type must be specified"));
197 return;
198 }
199
200 int frame_size = codec_server->init(request.config());
201 if (frame_size < 0) {
202 std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(
203 method_call, kMmcServiceError,
204 "Init codec server failed: " + std::string(strerror(-frame_size))));
205 return;
206 }
207 response.set_input_frame_size(frame_size);
208
209 // Generate socket name for client.
210 std::string socket_path =
211 std::string(kMmcSocketName) + base::UnguessableToken::Create().ToString();
212 response.set_socket_token(socket_path);
213
214 int skt_fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
215 if (skt_fd < 0) {
216 std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(
217 method_call, kMmcServiceError,
218 "Create socket failed: " + std::string(strerror(errno))));
219 return;
220 }
221
222 struct sockaddr_un addr = {};
223 addr.sun_family = AF_UNIX;
224 strncpy(addr.sun_path, response.socket_token().c_str(), sizeof(addr.sun_path) - 1);
225 unlink(addr.sun_path);
226
227 if (bind(skt_fd, (struct sockaddr*)&addr, sizeof(struct sockaddr_un)) == -1) {
228 std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(
229 method_call, kMmcServiceError, "Bind socket failed: " + std::string(strerror(errno))));
230 return;
231 }
232
233 // mmc_service group can read/write the socket.
234 int rc = chmod(addr.sun_path, 0770);
235 if (rc < 0) {
236 std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(
237 method_call, kMmcServiceError, "Chmod socket failed: " + std::string(strerror(errno))));
238 return;
239 }
240
241 if (listen(skt_fd, kClientMaximum) == -1) {
242 std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(
243 method_call, kMmcServiceError,
244 "Listen socket failed: " + std::string(strerror(errno))));
245 return;
246 }
247
248 // Create a thread and pass codec server and socket fd to it.
249 if (!StartWorkerThread(skt_fd, std::move(addr), std::move(codec_server))) {
250 std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(method_call, kMmcServiceError,
251 "No free thread available"));
252 return;
253 }
254
255 writer.AppendProtoAsArrayOfBytes(response);
256 std::move(sender).Run(std::move(dbus_response));
257 return;
258 }
259
CodecCleanUp(dbus::MethodCall * method_call,dbus::ExportedObject::ResponseSender sender)260 void Service::CodecCleanUp(dbus::MethodCall* method_call,
261 dbus::ExportedObject::ResponseSender sender) {
262 auto dbus_response = dbus::Response::FromMethodCall(method_call);
263 RemoveIdleThread();
264 std::move(sender).Run(std::move(dbus_response));
265 return;
266 }
267
StartWorkerThread(int fd,struct sockaddr_un addr,std::unique_ptr<MmcInterface> codec_server)268 bool Service::StartWorkerThread(int fd, struct sockaddr_un addr,
269 std::unique_ptr<MmcInterface> codec_server) {
270 // Each thread has its associated future to indicate task completion.
271 std::promise<void> task_ended;
272 thread_pool_.push_back(
273 std::make_pair(std::make_unique<bluetooth::common::MessageLoopThread>(kWorkerThreadName),
274 std::make_unique<std::future<void>>(task_ended.get_future())));
275
276 // Start up thread and assign task to it.
277 thread_pool_.back().first->StartUp();
278 if (!thread_pool_.back().first->IsRunning()) {
279 log::error("Failed to start thread");
280 return false;
281 }
282
283 // Real-time scheduling increases thread priority.
284 // Without it, the thread still works.
285 if (!thread_pool_.back().first->EnableRealTimeScheduling()) {
286 log::warn("Failed to enable real time scheduling");
287 }
288
289 if (!thread_pool_.back().first->DoInThread(
290 FROM_HERE, base::BindOnce(&StartSocketListener, fd, std::move(addr),
291 std::move(task_ended), std::move(codec_server)))) {
292 log::error("Failed to run task");
293 return false;
294 }
295
296 return true;
297 }
298
RemoveIdleThread()299 void Service::RemoveIdleThread() {
300 for (auto thread = thread_pool_.begin(); thread != thread_pool_.end();) {
301 if (thread->second->wait_for(std::chrono::milliseconds(kThreadCheckTimeout)) ==
302 std::future_status::ready) {
303 // The task is over, close the thread and remove it from the thread pool.
304 thread->first->ShutDown();
305 thread = thread_pool_.erase(thread);
306 } else {
307 thread++;
308 }
309 }
310 }
311
312 } // namespace mmc
313