xref: /aosp_15_r20/external/deqp/executor/xeTcpIpLink.cpp (revision 35238bce31c2a825756842865a792f8cf7f89930)
1 /*-------------------------------------------------------------------------
2  * drawElements Quality Program Test Executor
3  * ------------------------------------------
4  *
5  * Copyright 2014 The Android Open Source Project
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  *//*!
20  * \file
21  * \brief Tcp/Ip communication link.
22  *//*--------------------------------------------------------------------*/
23 
24 #include "xeTcpIpLink.hpp"
25 #include "xsProtocol.hpp"
26 #include "deClock.h"
27 #include "deInt32.h"
28 
29 namespace xe
30 {
31 
32 enum
33 {
34     SEND_BUFFER_BLOCK_SIZE = 1024,
35     SEND_BUFFER_NUM_BLOCKS = 64
36 };
37 
38 // Utilities for writing messages out.
39 
writeMessageHeader(de::BlockBuffer<uint8_t> & dst,xs::MessageType type,int messageSize)40 static void writeMessageHeader(de::BlockBuffer<uint8_t> &dst, xs::MessageType type, int messageSize)
41 {
42     uint8_t hdr[xs::MESSAGE_HEADER_SIZE];
43     xs::Message::writeHeader(type, messageSize, &hdr[0], xs::MESSAGE_HEADER_SIZE);
44     dst.write(xs::MESSAGE_HEADER_SIZE, &hdr[0]);
45 }
46 
writeKeepalive(de::BlockBuffer<uint8_t> & dst)47 static void writeKeepalive(de::BlockBuffer<uint8_t> &dst)
48 {
49     writeMessageHeader(dst, xs::MESSAGETYPE_KEEPALIVE, xs::MESSAGE_HEADER_SIZE);
50     dst.flush();
51 }
52 
writeExecuteBinary(de::BlockBuffer<uint8_t> & dst,const char * name,const char * params,const char * workDir,const char * caseList)53 static void writeExecuteBinary(de::BlockBuffer<uint8_t> &dst, const char *name, const char *params, const char *workDir,
54                                const char *caseList)
55 {
56     int nameSize     = (int)strlen(name) + 1;
57     int paramsSize   = (int)strlen(params) + 1;
58     int workDirSize  = (int)strlen(workDir) + 1;
59     int caseListSize = (int)strlen(caseList) + 1;
60     int totalSize    = xs::MESSAGE_HEADER_SIZE + nameSize + paramsSize + workDirSize + caseListSize;
61 
62     writeMessageHeader(dst, xs::MESSAGETYPE_EXECUTE_BINARY, totalSize);
63     dst.write(nameSize, (const uint8_t *)name);
64     dst.write(paramsSize, (const uint8_t *)params);
65     dst.write(workDirSize, (const uint8_t *)workDir);
66     dst.write(caseListSize, (const uint8_t *)caseList);
67     dst.flush();
68 }
69 
writeStopExecution(de::BlockBuffer<uint8_t> & dst)70 static void writeStopExecution(de::BlockBuffer<uint8_t> &dst)
71 {
72     writeMessageHeader(dst, xs::MESSAGETYPE_STOP_EXECUTION, xs::MESSAGE_HEADER_SIZE);
73     dst.flush();
74 }
75 
76 // TcpIpLinkState
77 
TcpIpLinkState(CommLinkState initialState,const char * initialErr)78 TcpIpLinkState::TcpIpLinkState(CommLinkState initialState, const char *initialErr)
79     : m_state(initialState)
80     , m_error(initialErr)
81     , m_lastKeepaliveReceived(0)
82     , m_stateChangedCallback(DE_NULL)
83     , m_testLogDataCallback(DE_NULL)
84     , m_infoLogDataCallback(DE_NULL)
85     , m_userPtr(DE_NULL)
86 {
87 }
88 
~TcpIpLinkState(void)89 TcpIpLinkState::~TcpIpLinkState(void)
90 {
91 }
92 
getState(void) const93 CommLinkState TcpIpLinkState::getState(void) const
94 {
95     de::ScopedLock lock(m_lock);
96 
97     return m_state;
98 }
99 
getState(std::string & error) const100 CommLinkState TcpIpLinkState::getState(std::string &error) const
101 {
102     de::ScopedLock lock(m_lock);
103 
104     error = m_error;
105     return m_state;
106 }
107 
setCallbacks(CommLink::StateChangedFunc stateChangedCallback,CommLink::LogDataFunc testLogDataCallback,CommLink::LogDataFunc infoLogDataCallback,void * userPtr)108 void TcpIpLinkState::setCallbacks(CommLink::StateChangedFunc stateChangedCallback,
109                                   CommLink::LogDataFunc testLogDataCallback, CommLink::LogDataFunc infoLogDataCallback,
110                                   void *userPtr)
111 {
112     de::ScopedLock lock(m_lock);
113 
114     m_stateChangedCallback = stateChangedCallback;
115     m_testLogDataCallback  = testLogDataCallback;
116     m_infoLogDataCallback  = infoLogDataCallback;
117     m_userPtr              = userPtr;
118 }
119 
setState(CommLinkState state,const char * error)120 void TcpIpLinkState::setState(CommLinkState state, const char *error)
121 {
122     CommLink::StateChangedFunc callback = DE_NULL;
123     void *userPtr                       = DE_NULL;
124 
125     {
126         de::ScopedLock lock(m_lock);
127 
128         m_state = state;
129         m_error = error;
130 
131         callback = m_stateChangedCallback;
132         userPtr  = m_userPtr;
133     }
134 
135     if (callback)
136         callback(userPtr, state, error);
137 }
138 
onTestLogData(const uint8_t * bytes,size_t numBytes) const139 void TcpIpLinkState::onTestLogData(const uint8_t *bytes, size_t numBytes) const
140 {
141     CommLink::LogDataFunc callback = DE_NULL;
142     void *userPtr                  = DE_NULL;
143 
144     m_lock.lock();
145     callback = m_testLogDataCallback;
146     userPtr  = m_userPtr;
147     m_lock.unlock();
148 
149     if (callback)
150         callback(userPtr, bytes, numBytes);
151 }
152 
onInfoLogData(const uint8_t * bytes,size_t numBytes) const153 void TcpIpLinkState::onInfoLogData(const uint8_t *bytes, size_t numBytes) const
154 {
155     CommLink::LogDataFunc callback = DE_NULL;
156     void *userPtr                  = DE_NULL;
157 
158     m_lock.lock();
159     callback = m_infoLogDataCallback;
160     userPtr  = m_userPtr;
161     m_lock.unlock();
162 
163     if (callback)
164         callback(userPtr, bytes, numBytes);
165 }
166 
onKeepaliveReceived(void)167 void TcpIpLinkState::onKeepaliveReceived(void)
168 {
169     de::ScopedLock lock(m_lock);
170     m_lastKeepaliveReceived = deGetMicroseconds();
171 }
172 
getLastKeepaliveRecevied(void) const173 uint64_t TcpIpLinkState::getLastKeepaliveRecevied(void) const
174 {
175     de::ScopedLock lock(m_lock);
176     return m_lastKeepaliveReceived;
177 }
178 
179 // TcpIpSendThread
180 
TcpIpSendThread(de::Socket & socket,TcpIpLinkState & state)181 TcpIpSendThread::TcpIpSendThread(de::Socket &socket, TcpIpLinkState &state)
182     : m_socket(socket)
183     , m_state(state)
184     , m_buffer(SEND_BUFFER_BLOCK_SIZE, SEND_BUFFER_NUM_BLOCKS)
185     , m_isRunning(false)
186 {
187 }
188 
~TcpIpSendThread(void)189 TcpIpSendThread::~TcpIpSendThread(void)
190 {
191 }
192 
start(void)193 void TcpIpSendThread::start(void)
194 {
195     DE_ASSERT(!m_isRunning);
196 
197     // Reset state.
198     m_buffer.clear();
199     m_isRunning = true;
200 
201     de::Thread::start();
202 }
203 
run(void)204 void TcpIpSendThread::run(void)
205 {
206     try
207     {
208         uint8_t buf[SEND_BUFFER_BLOCK_SIZE];
209 
210         while (!m_buffer.isCanceled())
211         {
212             size_t numToSend      = 0;
213             size_t numSent        = 0;
214             deSocketResult result = DE_SOCKETRESULT_LAST;
215 
216             try
217             {
218                 // Wait for single byte and then try to read more.
219                 m_buffer.read(1, &buf[0]);
220                 numToSend = 1 + m_buffer.tryRead(DE_LENGTH_OF_ARRAY(buf) - 1, &buf[1]);
221             }
222             catch (const de::BlockBuffer<uint8_t>::CanceledException &)
223             {
224                 // Handled in loop condition.
225             }
226 
227             while (numSent < numToSend)
228             {
229                 result = m_socket.send(&buf[numSent], numToSend - numSent, &numSent);
230 
231                 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
232                     XE_FAIL("Connection closed");
233                 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
234                     XE_FAIL("Connection terminated");
235                 else if (result == DE_SOCKETRESULT_ERROR)
236                     XE_FAIL("Socket error");
237                 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
238                 {
239                     // \note Socket should not be in non-blocking mode.
240                     DE_ASSERT(numSent == 0);
241                     deYield();
242                 }
243                 else
244                     DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
245             }
246         }
247     }
248     catch (const std::exception &e)
249     {
250         m_state.setState(COMMLINKSTATE_ERROR, e.what());
251     }
252 }
253 
stop(void)254 void TcpIpSendThread::stop(void)
255 {
256     if (m_isRunning)
257     {
258         m_buffer.cancel();
259         join();
260         m_isRunning = false;
261     }
262 }
263 
264 // TcpIpRecvThread
265 
TcpIpRecvThread(de::Socket & socket,TcpIpLinkState & state)266 TcpIpRecvThread::TcpIpRecvThread(de::Socket &socket, TcpIpLinkState &state)
267     : m_socket(socket)
268     , m_state(state)
269     , m_curMsgPos(0)
270     , m_isRunning(false)
271 {
272 }
273 
~TcpIpRecvThread(void)274 TcpIpRecvThread::~TcpIpRecvThread(void)
275 {
276 }
277 
start(void)278 void TcpIpRecvThread::start(void)
279 {
280     DE_ASSERT(!m_isRunning);
281 
282     // Reset state.
283     m_curMsgPos = 0;
284     m_isRunning = true;
285 
286     de::Thread::start();
287 }
288 
run(void)289 void TcpIpRecvThread::run(void)
290 {
291     try
292     {
293         for (;;)
294         {
295             bool hasHeader              = m_curMsgPos >= xs::MESSAGE_HEADER_SIZE;
296             bool hasPayload             = false;
297             size_t messageSize          = 0;
298             xs::MessageType messageType = (xs::MessageType)0;
299 
300             if (hasHeader)
301             {
302                 xs::Message::parseHeader(&m_curMsgBuf[0], xs::MESSAGE_HEADER_SIZE, messageType, messageSize);
303                 hasPayload = m_curMsgPos >= messageSize;
304             }
305 
306             if (hasPayload)
307             {
308                 // Process message.
309                 handleMessage(messageType,
310                               m_curMsgPos > xs::MESSAGE_HEADER_SIZE ? &m_curMsgBuf[xs::MESSAGE_HEADER_SIZE] : DE_NULL,
311                               messageSize - xs::MESSAGE_HEADER_SIZE);
312                 m_curMsgPos = 0;
313             }
314             else
315             {
316                 // Try to receive missing bytes.
317                 size_t curSize        = hasHeader ? messageSize : (size_t)xs::MESSAGE_HEADER_SIZE;
318                 size_t bytesToRecv    = curSize - m_curMsgPos;
319                 size_t numRecv        = 0;
320                 deSocketResult result = DE_SOCKETRESULT_LAST;
321 
322                 if (m_curMsgBuf.size() < curSize)
323                     m_curMsgBuf.resize(curSize);
324 
325                 result = m_socket.receive(&m_curMsgBuf[m_curMsgPos], bytesToRecv, &numRecv);
326 
327                 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
328                     XE_FAIL("Connection closed");
329                 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
330                     XE_FAIL("Connection terminated");
331                 else if (result == DE_SOCKETRESULT_ERROR)
332                     XE_FAIL("Socket error");
333                 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
334                 {
335                     // \note Socket should not be in non-blocking mode.
336                     DE_ASSERT(numRecv == 0);
337                     deYield();
338                 }
339                 else
340                 {
341                     DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
342                     DE_ASSERT(numRecv <= bytesToRecv);
343                     m_curMsgPos += numRecv;
344                     // Continue receiving bytes / handle message in next iter.
345                 }
346             }
347         }
348     }
349     catch (const std::exception &e)
350     {
351         m_state.setState(COMMLINKSTATE_ERROR, e.what());
352     }
353 }
354 
stop(void)355 void TcpIpRecvThread::stop(void)
356 {
357     if (m_isRunning)
358     {
359         // \note Socket must be closed before terminating receive thread.
360         XE_CHECK(!m_socket.isReceiveOpen());
361 
362         join();
363         m_isRunning = false;
364     }
365 }
366 
handleMessage(xs::MessageType messageType,const uint8_t * data,size_t dataSize)367 void TcpIpRecvThread::handleMessage(xs::MessageType messageType, const uint8_t *data, size_t dataSize)
368 {
369     switch (messageType)
370     {
371     case xs::MESSAGETYPE_KEEPALIVE:
372         m_state.onKeepaliveReceived();
373         break;
374 
375     case xs::MESSAGETYPE_PROCESS_STARTED:
376         XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_STARTED message");
377         m_state.setState(COMMLINKSTATE_TEST_PROCESS_RUNNING);
378         break;
379 
380     case xs::MESSAGETYPE_PROCESS_LAUNCH_FAILED:
381     {
382         xs::ProcessLaunchFailedMessage msg(data, dataSize);
383         XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING,
384                      "Unexpected PROCESS_LAUNCH_FAILED message");
385         m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCH_FAILED, msg.reason.c_str());
386         break;
387     }
388 
389     case xs::MESSAGETYPE_PROCESS_FINISHED:
390     {
391         XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_FINISHED message");
392         xs::ProcessFinishedMessage msg(data, dataSize);
393         m_state.setState(COMMLINKSTATE_TEST_PROCESS_FINISHED);
394         DE_UNREF(msg); // \todo [2012-06-19 pyry] Report exit code.
395         break;
396     }
397 
398     case xs::MESSAGETYPE_PROCESS_LOG_DATA:
399     case xs::MESSAGETYPE_INFO:
400         // Ignore leading \0 if such is present. \todo [2012-06-19 pyry] Improve protocol.
401         if (data[dataSize - 1] == 0)
402             dataSize -= 1;
403 
404         if (messageType == xs::MESSAGETYPE_PROCESS_LOG_DATA)
405         {
406             XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING,
407                          "Unexpected PROCESS_LOG_DATA message");
408             m_state.onTestLogData(&data[0], dataSize);
409         }
410         else
411             m_state.onInfoLogData(&data[0], dataSize);
412         break;
413 
414     default:
415         XE_FAIL("Unknown message");
416     }
417 }
418 
419 // TcpIpLink
420 
TcpIpLink(void)421 TcpIpLink::TcpIpLink(void)
422     : m_state(COMMLINKSTATE_ERROR, "Not connected")
423     , m_sendThread(m_socket, m_state)
424     , m_recvThread(m_socket, m_state)
425     , m_keepaliveTimer(DE_NULL)
426 {
427     m_keepaliveTimer = deTimer_create(keepaliveTimerCallback, this);
428     XE_CHECK(m_keepaliveTimer);
429 }
430 
~TcpIpLink(void)431 TcpIpLink::~TcpIpLink(void)
432 {
433     try
434     {
435         closeConnection();
436     }
437     catch (...)
438     {
439         // Can't do much except to ignore error.
440     }
441     deTimer_destroy(m_keepaliveTimer);
442 }
443 
closeConnection(void)444 void TcpIpLink::closeConnection(void)
445 {
446     {
447         deSocketState state = m_socket.getState();
448         if (state != DE_SOCKETSTATE_DISCONNECTED && state != DE_SOCKETSTATE_CLOSED)
449             m_socket.shutdown();
450     }
451 
452     if (deTimer_isActive(m_keepaliveTimer))
453         deTimer_disable(m_keepaliveTimer);
454 
455     if (m_sendThread.isRunning())
456         m_sendThread.stop();
457 
458     if (m_recvThread.isRunning())
459         m_recvThread.stop();
460 
461     if (m_socket.getState() != DE_SOCKETSTATE_CLOSED)
462         m_socket.close();
463 }
464 
connect(const de::SocketAddress & address)465 void TcpIpLink::connect(const de::SocketAddress &address)
466 {
467     XE_CHECK(m_socket.getState() == DE_SOCKETSTATE_CLOSED);
468     XE_CHECK(m_state.getState() == COMMLINKSTATE_ERROR);
469     XE_CHECK(!m_sendThread.isRunning());
470     XE_CHECK(!m_recvThread.isRunning());
471 
472     m_socket.connect(address);
473 
474     try
475     {
476         // Clear error and set state to ready.
477         m_state.setState(COMMLINKSTATE_READY, "");
478         m_state.onKeepaliveReceived();
479 
480         // Launch threads.
481         m_sendThread.start();
482         m_recvThread.start();
483 
484         XE_CHECK(deTimer_scheduleInterval(m_keepaliveTimer, xs::KEEPALIVE_SEND_INTERVAL));
485     }
486     catch (const std::exception &e)
487     {
488         closeConnection();
489         m_state.setState(COMMLINKSTATE_ERROR, e.what());
490         throw;
491     }
492 }
493 
disconnect(void)494 void TcpIpLink::disconnect(void)
495 {
496     try
497     {
498         closeConnection();
499         m_state.setState(COMMLINKSTATE_ERROR, "Not connected");
500     }
501     catch (const std::exception &e)
502     {
503         m_state.setState(COMMLINKSTATE_ERROR, e.what());
504     }
505 }
506 
reset(void)507 void TcpIpLink::reset(void)
508 {
509     // \note Just clears error state if we are connected.
510     if (m_socket.getState() == DE_SOCKETSTATE_CONNECTED)
511     {
512         m_state.setState(COMMLINKSTATE_READY, "");
513 
514         // \todo [2012-07-10 pyry] Do we need to reset send/receive buffers?
515     }
516     else
517         disconnect(); // Abnormal state/usage. Disconnect socket.
518 }
519 
keepaliveTimerCallback(void * ptr)520 void TcpIpLink::keepaliveTimerCallback(void *ptr)
521 {
522     TcpIpLink *link        = static_cast<TcpIpLink *>(ptr);
523     uint64_t lastKeepalive = link->m_state.getLastKeepaliveRecevied();
524     uint64_t curTime       = deGetMicroseconds();
525 
526     // Check for timeout.
527     if ((int64_t)curTime - (int64_t)lastKeepalive > xs::KEEPALIVE_TIMEOUT * 1000)
528         link->m_state.setState(COMMLINKSTATE_ERROR, "Keepalive timeout");
529 
530     // Enqueue new keepalive.
531     try
532     {
533         writeKeepalive(link->m_sendThread.getBuffer());
534     }
535     catch (const de::BlockBuffer<uint8_t>::CanceledException &)
536     {
537         // Ignore. Can happen in connection teardown.
538     }
539 }
540 
getState(void) const541 CommLinkState TcpIpLink::getState(void) const
542 {
543     return m_state.getState();
544 }
545 
getState(std::string & message) const546 CommLinkState TcpIpLink::getState(std::string &message) const
547 {
548     return m_state.getState(message);
549 }
550 
setCallbacks(StateChangedFunc stateChangedCallback,LogDataFunc testLogDataCallback,LogDataFunc infoLogDataCallback,void * userPtr)551 void TcpIpLink::setCallbacks(StateChangedFunc stateChangedCallback, LogDataFunc testLogDataCallback,
552                              LogDataFunc infoLogDataCallback, void *userPtr)
553 {
554     m_state.setCallbacks(stateChangedCallback, testLogDataCallback, infoLogDataCallback, userPtr);
555 }
556 
startTestProcess(const char * name,const char * params,const char * workingDir,const char * caseList)557 void TcpIpLink::startTestProcess(const char *name, const char *params, const char *workingDir, const char *caseList)
558 {
559     XE_CHECK(m_state.getState() == COMMLINKSTATE_READY);
560 
561     m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCHING);
562     writeExecuteBinary(m_sendThread.getBuffer(), name, params, workingDir, caseList);
563 }
564 
stopTestProcess(void)565 void TcpIpLink::stopTestProcess(void)
566 {
567     XE_CHECK(m_state.getState() != COMMLINKSTATE_ERROR);
568     writeStopExecution(m_sendThread.getBuffer());
569 }
570 
571 } // namespace xe
572