1 /*-------------------------------------------------------------------------
2 * drawElements Quality Program Test Executor
3 * ------------------------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Tcp/Ip communication link.
22 *//*--------------------------------------------------------------------*/
23
24 #include "xeTcpIpLink.hpp"
25 #include "xsProtocol.hpp"
26 #include "deClock.h"
27 #include "deInt32.h"
28
29 namespace xe
30 {
31
32 enum
33 {
34 SEND_BUFFER_BLOCK_SIZE = 1024,
35 SEND_BUFFER_NUM_BLOCKS = 64
36 };
37
38 // Utilities for writing messages out.
39
writeMessageHeader(de::BlockBuffer<uint8_t> & dst,xs::MessageType type,int messageSize)40 static void writeMessageHeader(de::BlockBuffer<uint8_t> &dst, xs::MessageType type, int messageSize)
41 {
42 uint8_t hdr[xs::MESSAGE_HEADER_SIZE];
43 xs::Message::writeHeader(type, messageSize, &hdr[0], xs::MESSAGE_HEADER_SIZE);
44 dst.write(xs::MESSAGE_HEADER_SIZE, &hdr[0]);
45 }
46
writeKeepalive(de::BlockBuffer<uint8_t> & dst)47 static void writeKeepalive(de::BlockBuffer<uint8_t> &dst)
48 {
49 writeMessageHeader(dst, xs::MESSAGETYPE_KEEPALIVE, xs::MESSAGE_HEADER_SIZE);
50 dst.flush();
51 }
52
writeExecuteBinary(de::BlockBuffer<uint8_t> & dst,const char * name,const char * params,const char * workDir,const char * caseList)53 static void writeExecuteBinary(de::BlockBuffer<uint8_t> &dst, const char *name, const char *params, const char *workDir,
54 const char *caseList)
55 {
56 int nameSize = (int)strlen(name) + 1;
57 int paramsSize = (int)strlen(params) + 1;
58 int workDirSize = (int)strlen(workDir) + 1;
59 int caseListSize = (int)strlen(caseList) + 1;
60 int totalSize = xs::MESSAGE_HEADER_SIZE + nameSize + paramsSize + workDirSize + caseListSize;
61
62 writeMessageHeader(dst, xs::MESSAGETYPE_EXECUTE_BINARY, totalSize);
63 dst.write(nameSize, (const uint8_t *)name);
64 dst.write(paramsSize, (const uint8_t *)params);
65 dst.write(workDirSize, (const uint8_t *)workDir);
66 dst.write(caseListSize, (const uint8_t *)caseList);
67 dst.flush();
68 }
69
writeStopExecution(de::BlockBuffer<uint8_t> & dst)70 static void writeStopExecution(de::BlockBuffer<uint8_t> &dst)
71 {
72 writeMessageHeader(dst, xs::MESSAGETYPE_STOP_EXECUTION, xs::MESSAGE_HEADER_SIZE);
73 dst.flush();
74 }
75
76 // TcpIpLinkState
77
TcpIpLinkState(CommLinkState initialState,const char * initialErr)78 TcpIpLinkState::TcpIpLinkState(CommLinkState initialState, const char *initialErr)
79 : m_state(initialState)
80 , m_error(initialErr)
81 , m_lastKeepaliveReceived(0)
82 , m_stateChangedCallback(DE_NULL)
83 , m_testLogDataCallback(DE_NULL)
84 , m_infoLogDataCallback(DE_NULL)
85 , m_userPtr(DE_NULL)
86 {
87 }
88
~TcpIpLinkState(void)89 TcpIpLinkState::~TcpIpLinkState(void)
90 {
91 }
92
getState(void) const93 CommLinkState TcpIpLinkState::getState(void) const
94 {
95 de::ScopedLock lock(m_lock);
96
97 return m_state;
98 }
99
getState(std::string & error) const100 CommLinkState TcpIpLinkState::getState(std::string &error) const
101 {
102 de::ScopedLock lock(m_lock);
103
104 error = m_error;
105 return m_state;
106 }
107
setCallbacks(CommLink::StateChangedFunc stateChangedCallback,CommLink::LogDataFunc testLogDataCallback,CommLink::LogDataFunc infoLogDataCallback,void * userPtr)108 void TcpIpLinkState::setCallbacks(CommLink::StateChangedFunc stateChangedCallback,
109 CommLink::LogDataFunc testLogDataCallback, CommLink::LogDataFunc infoLogDataCallback,
110 void *userPtr)
111 {
112 de::ScopedLock lock(m_lock);
113
114 m_stateChangedCallback = stateChangedCallback;
115 m_testLogDataCallback = testLogDataCallback;
116 m_infoLogDataCallback = infoLogDataCallback;
117 m_userPtr = userPtr;
118 }
119
setState(CommLinkState state,const char * error)120 void TcpIpLinkState::setState(CommLinkState state, const char *error)
121 {
122 CommLink::StateChangedFunc callback = DE_NULL;
123 void *userPtr = DE_NULL;
124
125 {
126 de::ScopedLock lock(m_lock);
127
128 m_state = state;
129 m_error = error;
130
131 callback = m_stateChangedCallback;
132 userPtr = m_userPtr;
133 }
134
135 if (callback)
136 callback(userPtr, state, error);
137 }
138
onTestLogData(const uint8_t * bytes,size_t numBytes) const139 void TcpIpLinkState::onTestLogData(const uint8_t *bytes, size_t numBytes) const
140 {
141 CommLink::LogDataFunc callback = DE_NULL;
142 void *userPtr = DE_NULL;
143
144 m_lock.lock();
145 callback = m_testLogDataCallback;
146 userPtr = m_userPtr;
147 m_lock.unlock();
148
149 if (callback)
150 callback(userPtr, bytes, numBytes);
151 }
152
onInfoLogData(const uint8_t * bytes,size_t numBytes) const153 void TcpIpLinkState::onInfoLogData(const uint8_t *bytes, size_t numBytes) const
154 {
155 CommLink::LogDataFunc callback = DE_NULL;
156 void *userPtr = DE_NULL;
157
158 m_lock.lock();
159 callback = m_infoLogDataCallback;
160 userPtr = m_userPtr;
161 m_lock.unlock();
162
163 if (callback)
164 callback(userPtr, bytes, numBytes);
165 }
166
onKeepaliveReceived(void)167 void TcpIpLinkState::onKeepaliveReceived(void)
168 {
169 de::ScopedLock lock(m_lock);
170 m_lastKeepaliveReceived = deGetMicroseconds();
171 }
172
getLastKeepaliveRecevied(void) const173 uint64_t TcpIpLinkState::getLastKeepaliveRecevied(void) const
174 {
175 de::ScopedLock lock(m_lock);
176 return m_lastKeepaliveReceived;
177 }
178
179 // TcpIpSendThread
180
TcpIpSendThread(de::Socket & socket,TcpIpLinkState & state)181 TcpIpSendThread::TcpIpSendThread(de::Socket &socket, TcpIpLinkState &state)
182 : m_socket(socket)
183 , m_state(state)
184 , m_buffer(SEND_BUFFER_BLOCK_SIZE, SEND_BUFFER_NUM_BLOCKS)
185 , m_isRunning(false)
186 {
187 }
188
~TcpIpSendThread(void)189 TcpIpSendThread::~TcpIpSendThread(void)
190 {
191 }
192
start(void)193 void TcpIpSendThread::start(void)
194 {
195 DE_ASSERT(!m_isRunning);
196
197 // Reset state.
198 m_buffer.clear();
199 m_isRunning = true;
200
201 de::Thread::start();
202 }
203
run(void)204 void TcpIpSendThread::run(void)
205 {
206 try
207 {
208 uint8_t buf[SEND_BUFFER_BLOCK_SIZE];
209
210 while (!m_buffer.isCanceled())
211 {
212 size_t numToSend = 0;
213 size_t numSent = 0;
214 deSocketResult result = DE_SOCKETRESULT_LAST;
215
216 try
217 {
218 // Wait for single byte and then try to read more.
219 m_buffer.read(1, &buf[0]);
220 numToSend = 1 + m_buffer.tryRead(DE_LENGTH_OF_ARRAY(buf) - 1, &buf[1]);
221 }
222 catch (const de::BlockBuffer<uint8_t>::CanceledException &)
223 {
224 // Handled in loop condition.
225 }
226
227 while (numSent < numToSend)
228 {
229 result = m_socket.send(&buf[numSent], numToSend - numSent, &numSent);
230
231 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
232 XE_FAIL("Connection closed");
233 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
234 XE_FAIL("Connection terminated");
235 else if (result == DE_SOCKETRESULT_ERROR)
236 XE_FAIL("Socket error");
237 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
238 {
239 // \note Socket should not be in non-blocking mode.
240 DE_ASSERT(numSent == 0);
241 deYield();
242 }
243 else
244 DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
245 }
246 }
247 }
248 catch (const std::exception &e)
249 {
250 m_state.setState(COMMLINKSTATE_ERROR, e.what());
251 }
252 }
253
stop(void)254 void TcpIpSendThread::stop(void)
255 {
256 if (m_isRunning)
257 {
258 m_buffer.cancel();
259 join();
260 m_isRunning = false;
261 }
262 }
263
264 // TcpIpRecvThread
265
TcpIpRecvThread(de::Socket & socket,TcpIpLinkState & state)266 TcpIpRecvThread::TcpIpRecvThread(de::Socket &socket, TcpIpLinkState &state)
267 : m_socket(socket)
268 , m_state(state)
269 , m_curMsgPos(0)
270 , m_isRunning(false)
271 {
272 }
273
~TcpIpRecvThread(void)274 TcpIpRecvThread::~TcpIpRecvThread(void)
275 {
276 }
277
start(void)278 void TcpIpRecvThread::start(void)
279 {
280 DE_ASSERT(!m_isRunning);
281
282 // Reset state.
283 m_curMsgPos = 0;
284 m_isRunning = true;
285
286 de::Thread::start();
287 }
288
run(void)289 void TcpIpRecvThread::run(void)
290 {
291 try
292 {
293 for (;;)
294 {
295 bool hasHeader = m_curMsgPos >= xs::MESSAGE_HEADER_SIZE;
296 bool hasPayload = false;
297 size_t messageSize = 0;
298 xs::MessageType messageType = (xs::MessageType)0;
299
300 if (hasHeader)
301 {
302 xs::Message::parseHeader(&m_curMsgBuf[0], xs::MESSAGE_HEADER_SIZE, messageType, messageSize);
303 hasPayload = m_curMsgPos >= messageSize;
304 }
305
306 if (hasPayload)
307 {
308 // Process message.
309 handleMessage(messageType,
310 m_curMsgPos > xs::MESSAGE_HEADER_SIZE ? &m_curMsgBuf[xs::MESSAGE_HEADER_SIZE] : DE_NULL,
311 messageSize - xs::MESSAGE_HEADER_SIZE);
312 m_curMsgPos = 0;
313 }
314 else
315 {
316 // Try to receive missing bytes.
317 size_t curSize = hasHeader ? messageSize : (size_t)xs::MESSAGE_HEADER_SIZE;
318 size_t bytesToRecv = curSize - m_curMsgPos;
319 size_t numRecv = 0;
320 deSocketResult result = DE_SOCKETRESULT_LAST;
321
322 if (m_curMsgBuf.size() < curSize)
323 m_curMsgBuf.resize(curSize);
324
325 result = m_socket.receive(&m_curMsgBuf[m_curMsgPos], bytesToRecv, &numRecv);
326
327 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
328 XE_FAIL("Connection closed");
329 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
330 XE_FAIL("Connection terminated");
331 else if (result == DE_SOCKETRESULT_ERROR)
332 XE_FAIL("Socket error");
333 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
334 {
335 // \note Socket should not be in non-blocking mode.
336 DE_ASSERT(numRecv == 0);
337 deYield();
338 }
339 else
340 {
341 DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
342 DE_ASSERT(numRecv <= bytesToRecv);
343 m_curMsgPos += numRecv;
344 // Continue receiving bytes / handle message in next iter.
345 }
346 }
347 }
348 }
349 catch (const std::exception &e)
350 {
351 m_state.setState(COMMLINKSTATE_ERROR, e.what());
352 }
353 }
354
stop(void)355 void TcpIpRecvThread::stop(void)
356 {
357 if (m_isRunning)
358 {
359 // \note Socket must be closed before terminating receive thread.
360 XE_CHECK(!m_socket.isReceiveOpen());
361
362 join();
363 m_isRunning = false;
364 }
365 }
366
handleMessage(xs::MessageType messageType,const uint8_t * data,size_t dataSize)367 void TcpIpRecvThread::handleMessage(xs::MessageType messageType, const uint8_t *data, size_t dataSize)
368 {
369 switch (messageType)
370 {
371 case xs::MESSAGETYPE_KEEPALIVE:
372 m_state.onKeepaliveReceived();
373 break;
374
375 case xs::MESSAGETYPE_PROCESS_STARTED:
376 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_STARTED message");
377 m_state.setState(COMMLINKSTATE_TEST_PROCESS_RUNNING);
378 break;
379
380 case xs::MESSAGETYPE_PROCESS_LAUNCH_FAILED:
381 {
382 xs::ProcessLaunchFailedMessage msg(data, dataSize);
383 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING,
384 "Unexpected PROCESS_LAUNCH_FAILED message");
385 m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCH_FAILED, msg.reason.c_str());
386 break;
387 }
388
389 case xs::MESSAGETYPE_PROCESS_FINISHED:
390 {
391 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_FINISHED message");
392 xs::ProcessFinishedMessage msg(data, dataSize);
393 m_state.setState(COMMLINKSTATE_TEST_PROCESS_FINISHED);
394 DE_UNREF(msg); // \todo [2012-06-19 pyry] Report exit code.
395 break;
396 }
397
398 case xs::MESSAGETYPE_PROCESS_LOG_DATA:
399 case xs::MESSAGETYPE_INFO:
400 // Ignore leading \0 if such is present. \todo [2012-06-19 pyry] Improve protocol.
401 if (data[dataSize - 1] == 0)
402 dataSize -= 1;
403
404 if (messageType == xs::MESSAGETYPE_PROCESS_LOG_DATA)
405 {
406 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING,
407 "Unexpected PROCESS_LOG_DATA message");
408 m_state.onTestLogData(&data[0], dataSize);
409 }
410 else
411 m_state.onInfoLogData(&data[0], dataSize);
412 break;
413
414 default:
415 XE_FAIL("Unknown message");
416 }
417 }
418
419 // TcpIpLink
420
TcpIpLink(void)421 TcpIpLink::TcpIpLink(void)
422 : m_state(COMMLINKSTATE_ERROR, "Not connected")
423 , m_sendThread(m_socket, m_state)
424 , m_recvThread(m_socket, m_state)
425 , m_keepaliveTimer(DE_NULL)
426 {
427 m_keepaliveTimer = deTimer_create(keepaliveTimerCallback, this);
428 XE_CHECK(m_keepaliveTimer);
429 }
430
~TcpIpLink(void)431 TcpIpLink::~TcpIpLink(void)
432 {
433 try
434 {
435 closeConnection();
436 }
437 catch (...)
438 {
439 // Can't do much except to ignore error.
440 }
441 deTimer_destroy(m_keepaliveTimer);
442 }
443
closeConnection(void)444 void TcpIpLink::closeConnection(void)
445 {
446 {
447 deSocketState state = m_socket.getState();
448 if (state != DE_SOCKETSTATE_DISCONNECTED && state != DE_SOCKETSTATE_CLOSED)
449 m_socket.shutdown();
450 }
451
452 if (deTimer_isActive(m_keepaliveTimer))
453 deTimer_disable(m_keepaliveTimer);
454
455 if (m_sendThread.isRunning())
456 m_sendThread.stop();
457
458 if (m_recvThread.isRunning())
459 m_recvThread.stop();
460
461 if (m_socket.getState() != DE_SOCKETSTATE_CLOSED)
462 m_socket.close();
463 }
464
connect(const de::SocketAddress & address)465 void TcpIpLink::connect(const de::SocketAddress &address)
466 {
467 XE_CHECK(m_socket.getState() == DE_SOCKETSTATE_CLOSED);
468 XE_CHECK(m_state.getState() == COMMLINKSTATE_ERROR);
469 XE_CHECK(!m_sendThread.isRunning());
470 XE_CHECK(!m_recvThread.isRunning());
471
472 m_socket.connect(address);
473
474 try
475 {
476 // Clear error and set state to ready.
477 m_state.setState(COMMLINKSTATE_READY, "");
478 m_state.onKeepaliveReceived();
479
480 // Launch threads.
481 m_sendThread.start();
482 m_recvThread.start();
483
484 XE_CHECK(deTimer_scheduleInterval(m_keepaliveTimer, xs::KEEPALIVE_SEND_INTERVAL));
485 }
486 catch (const std::exception &e)
487 {
488 closeConnection();
489 m_state.setState(COMMLINKSTATE_ERROR, e.what());
490 throw;
491 }
492 }
493
disconnect(void)494 void TcpIpLink::disconnect(void)
495 {
496 try
497 {
498 closeConnection();
499 m_state.setState(COMMLINKSTATE_ERROR, "Not connected");
500 }
501 catch (const std::exception &e)
502 {
503 m_state.setState(COMMLINKSTATE_ERROR, e.what());
504 }
505 }
506
reset(void)507 void TcpIpLink::reset(void)
508 {
509 // \note Just clears error state if we are connected.
510 if (m_socket.getState() == DE_SOCKETSTATE_CONNECTED)
511 {
512 m_state.setState(COMMLINKSTATE_READY, "");
513
514 // \todo [2012-07-10 pyry] Do we need to reset send/receive buffers?
515 }
516 else
517 disconnect(); // Abnormal state/usage. Disconnect socket.
518 }
519
keepaliveTimerCallback(void * ptr)520 void TcpIpLink::keepaliveTimerCallback(void *ptr)
521 {
522 TcpIpLink *link = static_cast<TcpIpLink *>(ptr);
523 uint64_t lastKeepalive = link->m_state.getLastKeepaliveRecevied();
524 uint64_t curTime = deGetMicroseconds();
525
526 // Check for timeout.
527 if ((int64_t)curTime - (int64_t)lastKeepalive > xs::KEEPALIVE_TIMEOUT * 1000)
528 link->m_state.setState(COMMLINKSTATE_ERROR, "Keepalive timeout");
529
530 // Enqueue new keepalive.
531 try
532 {
533 writeKeepalive(link->m_sendThread.getBuffer());
534 }
535 catch (const de::BlockBuffer<uint8_t>::CanceledException &)
536 {
537 // Ignore. Can happen in connection teardown.
538 }
539 }
540
getState(void) const541 CommLinkState TcpIpLink::getState(void) const
542 {
543 return m_state.getState();
544 }
545
getState(std::string & message) const546 CommLinkState TcpIpLink::getState(std::string &message) const
547 {
548 return m_state.getState(message);
549 }
550
setCallbacks(StateChangedFunc stateChangedCallback,LogDataFunc testLogDataCallback,LogDataFunc infoLogDataCallback,void * userPtr)551 void TcpIpLink::setCallbacks(StateChangedFunc stateChangedCallback, LogDataFunc testLogDataCallback,
552 LogDataFunc infoLogDataCallback, void *userPtr)
553 {
554 m_state.setCallbacks(stateChangedCallback, testLogDataCallback, infoLogDataCallback, userPtr);
555 }
556
startTestProcess(const char * name,const char * params,const char * workingDir,const char * caseList)557 void TcpIpLink::startTestProcess(const char *name, const char *params, const char *workingDir, const char *caseList)
558 {
559 XE_CHECK(m_state.getState() == COMMLINKSTATE_READY);
560
561 m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCHING);
562 writeExecuteBinary(m_sendThread.getBuffer(), name, params, workingDir, caseList);
563 }
564
stopTestProcess(void)565 void TcpIpLink::stopTestProcess(void)
566 {
567 XE_CHECK(m_state.getState() != COMMLINKSTATE_ERROR);
568 writeStopExecution(m_sendThread.getBuffer());
569 }
570
571 } // namespace xe
572