1*35238bceSAndroid Build Coastguard Worker /*-------------------------------------------------------------------------
2*35238bceSAndroid Build Coastguard Worker * drawElements Quality Program Test Executor
3*35238bceSAndroid Build Coastguard Worker * ------------------------------------------
4*35238bceSAndroid Build Coastguard Worker *
5*35238bceSAndroid Build Coastguard Worker * Copyright 2014 The Android Open Source Project
6*35238bceSAndroid Build Coastguard Worker *
7*35238bceSAndroid Build Coastguard Worker * Licensed under the Apache License, Version 2.0 (the "License");
8*35238bceSAndroid Build Coastguard Worker * you may not use this file except in compliance with the License.
9*35238bceSAndroid Build Coastguard Worker * You may obtain a copy of the License at
10*35238bceSAndroid Build Coastguard Worker *
11*35238bceSAndroid Build Coastguard Worker * http://www.apache.org/licenses/LICENSE-2.0
12*35238bceSAndroid Build Coastguard Worker *
13*35238bceSAndroid Build Coastguard Worker * Unless required by applicable law or agreed to in writing, software
14*35238bceSAndroid Build Coastguard Worker * distributed under the License is distributed on an "AS IS" BASIS,
15*35238bceSAndroid Build Coastguard Worker * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16*35238bceSAndroid Build Coastguard Worker * See the License for the specific language governing permissions and
17*35238bceSAndroid Build Coastguard Worker * limitations under the License.
18*35238bceSAndroid Build Coastguard Worker *
19*35238bceSAndroid Build Coastguard Worker *//*!
20*35238bceSAndroid Build Coastguard Worker * \file
21*35238bceSAndroid Build Coastguard Worker * \brief Tcp/Ip communication link.
22*35238bceSAndroid Build Coastguard Worker *//*--------------------------------------------------------------------*/
23*35238bceSAndroid Build Coastguard Worker
24*35238bceSAndroid Build Coastguard Worker #include "xeTcpIpLink.hpp"
25*35238bceSAndroid Build Coastguard Worker #include "xsProtocol.hpp"
26*35238bceSAndroid Build Coastguard Worker #include "deClock.h"
27*35238bceSAndroid Build Coastguard Worker #include "deInt32.h"
28*35238bceSAndroid Build Coastguard Worker
29*35238bceSAndroid Build Coastguard Worker namespace xe
30*35238bceSAndroid Build Coastguard Worker {
31*35238bceSAndroid Build Coastguard Worker
32*35238bceSAndroid Build Coastguard Worker enum
33*35238bceSAndroid Build Coastguard Worker {
34*35238bceSAndroid Build Coastguard Worker SEND_BUFFER_BLOCK_SIZE = 1024,
35*35238bceSAndroid Build Coastguard Worker SEND_BUFFER_NUM_BLOCKS = 64
36*35238bceSAndroid Build Coastguard Worker };
37*35238bceSAndroid Build Coastguard Worker
38*35238bceSAndroid Build Coastguard Worker // Utilities for writing messages out.
39*35238bceSAndroid Build Coastguard Worker
writeMessageHeader(de::BlockBuffer<uint8_t> & dst,xs::MessageType type,int messageSize)40*35238bceSAndroid Build Coastguard Worker static void writeMessageHeader(de::BlockBuffer<uint8_t> &dst, xs::MessageType type, int messageSize)
41*35238bceSAndroid Build Coastguard Worker {
42*35238bceSAndroid Build Coastguard Worker uint8_t hdr[xs::MESSAGE_HEADER_SIZE];
43*35238bceSAndroid Build Coastguard Worker xs::Message::writeHeader(type, messageSize, &hdr[0], xs::MESSAGE_HEADER_SIZE);
44*35238bceSAndroid Build Coastguard Worker dst.write(xs::MESSAGE_HEADER_SIZE, &hdr[0]);
45*35238bceSAndroid Build Coastguard Worker }
46*35238bceSAndroid Build Coastguard Worker
writeKeepalive(de::BlockBuffer<uint8_t> & dst)47*35238bceSAndroid Build Coastguard Worker static void writeKeepalive(de::BlockBuffer<uint8_t> &dst)
48*35238bceSAndroid Build Coastguard Worker {
49*35238bceSAndroid Build Coastguard Worker writeMessageHeader(dst, xs::MESSAGETYPE_KEEPALIVE, xs::MESSAGE_HEADER_SIZE);
50*35238bceSAndroid Build Coastguard Worker dst.flush();
51*35238bceSAndroid Build Coastguard Worker }
52*35238bceSAndroid Build Coastguard Worker
writeExecuteBinary(de::BlockBuffer<uint8_t> & dst,const char * name,const char * params,const char * workDir,const char * caseList)53*35238bceSAndroid Build Coastguard Worker static void writeExecuteBinary(de::BlockBuffer<uint8_t> &dst, const char *name, const char *params, const char *workDir,
54*35238bceSAndroid Build Coastguard Worker const char *caseList)
55*35238bceSAndroid Build Coastguard Worker {
56*35238bceSAndroid Build Coastguard Worker int nameSize = (int)strlen(name) + 1;
57*35238bceSAndroid Build Coastguard Worker int paramsSize = (int)strlen(params) + 1;
58*35238bceSAndroid Build Coastguard Worker int workDirSize = (int)strlen(workDir) + 1;
59*35238bceSAndroid Build Coastguard Worker int caseListSize = (int)strlen(caseList) + 1;
60*35238bceSAndroid Build Coastguard Worker int totalSize = xs::MESSAGE_HEADER_SIZE + nameSize + paramsSize + workDirSize + caseListSize;
61*35238bceSAndroid Build Coastguard Worker
62*35238bceSAndroid Build Coastguard Worker writeMessageHeader(dst, xs::MESSAGETYPE_EXECUTE_BINARY, totalSize);
63*35238bceSAndroid Build Coastguard Worker dst.write(nameSize, (const uint8_t *)name);
64*35238bceSAndroid Build Coastguard Worker dst.write(paramsSize, (const uint8_t *)params);
65*35238bceSAndroid Build Coastguard Worker dst.write(workDirSize, (const uint8_t *)workDir);
66*35238bceSAndroid Build Coastguard Worker dst.write(caseListSize, (const uint8_t *)caseList);
67*35238bceSAndroid Build Coastguard Worker dst.flush();
68*35238bceSAndroid Build Coastguard Worker }
69*35238bceSAndroid Build Coastguard Worker
writeStopExecution(de::BlockBuffer<uint8_t> & dst)70*35238bceSAndroid Build Coastguard Worker static void writeStopExecution(de::BlockBuffer<uint8_t> &dst)
71*35238bceSAndroid Build Coastguard Worker {
72*35238bceSAndroid Build Coastguard Worker writeMessageHeader(dst, xs::MESSAGETYPE_STOP_EXECUTION, xs::MESSAGE_HEADER_SIZE);
73*35238bceSAndroid Build Coastguard Worker dst.flush();
74*35238bceSAndroid Build Coastguard Worker }
75*35238bceSAndroid Build Coastguard Worker
76*35238bceSAndroid Build Coastguard Worker // TcpIpLinkState
77*35238bceSAndroid Build Coastguard Worker
TcpIpLinkState(CommLinkState initialState,const char * initialErr)78*35238bceSAndroid Build Coastguard Worker TcpIpLinkState::TcpIpLinkState(CommLinkState initialState, const char *initialErr)
79*35238bceSAndroid Build Coastguard Worker : m_state(initialState)
80*35238bceSAndroid Build Coastguard Worker , m_error(initialErr)
81*35238bceSAndroid Build Coastguard Worker , m_lastKeepaliveReceived(0)
82*35238bceSAndroid Build Coastguard Worker , m_stateChangedCallback(DE_NULL)
83*35238bceSAndroid Build Coastguard Worker , m_testLogDataCallback(DE_NULL)
84*35238bceSAndroid Build Coastguard Worker , m_infoLogDataCallback(DE_NULL)
85*35238bceSAndroid Build Coastguard Worker , m_userPtr(DE_NULL)
86*35238bceSAndroid Build Coastguard Worker {
87*35238bceSAndroid Build Coastguard Worker }
88*35238bceSAndroid Build Coastguard Worker
~TcpIpLinkState(void)89*35238bceSAndroid Build Coastguard Worker TcpIpLinkState::~TcpIpLinkState(void)
90*35238bceSAndroid Build Coastguard Worker {
91*35238bceSAndroid Build Coastguard Worker }
92*35238bceSAndroid Build Coastguard Worker
getState(void) const93*35238bceSAndroid Build Coastguard Worker CommLinkState TcpIpLinkState::getState(void) const
94*35238bceSAndroid Build Coastguard Worker {
95*35238bceSAndroid Build Coastguard Worker de::ScopedLock lock(m_lock);
96*35238bceSAndroid Build Coastguard Worker
97*35238bceSAndroid Build Coastguard Worker return m_state;
98*35238bceSAndroid Build Coastguard Worker }
99*35238bceSAndroid Build Coastguard Worker
getState(std::string & error) const100*35238bceSAndroid Build Coastguard Worker CommLinkState TcpIpLinkState::getState(std::string &error) const
101*35238bceSAndroid Build Coastguard Worker {
102*35238bceSAndroid Build Coastguard Worker de::ScopedLock lock(m_lock);
103*35238bceSAndroid Build Coastguard Worker
104*35238bceSAndroid Build Coastguard Worker error = m_error;
105*35238bceSAndroid Build Coastguard Worker return m_state;
106*35238bceSAndroid Build Coastguard Worker }
107*35238bceSAndroid Build Coastguard Worker
setCallbacks(CommLink::StateChangedFunc stateChangedCallback,CommLink::LogDataFunc testLogDataCallback,CommLink::LogDataFunc infoLogDataCallback,void * userPtr)108*35238bceSAndroid Build Coastguard Worker void TcpIpLinkState::setCallbacks(CommLink::StateChangedFunc stateChangedCallback,
109*35238bceSAndroid Build Coastguard Worker CommLink::LogDataFunc testLogDataCallback, CommLink::LogDataFunc infoLogDataCallback,
110*35238bceSAndroid Build Coastguard Worker void *userPtr)
111*35238bceSAndroid Build Coastguard Worker {
112*35238bceSAndroid Build Coastguard Worker de::ScopedLock lock(m_lock);
113*35238bceSAndroid Build Coastguard Worker
114*35238bceSAndroid Build Coastguard Worker m_stateChangedCallback = stateChangedCallback;
115*35238bceSAndroid Build Coastguard Worker m_testLogDataCallback = testLogDataCallback;
116*35238bceSAndroid Build Coastguard Worker m_infoLogDataCallback = infoLogDataCallback;
117*35238bceSAndroid Build Coastguard Worker m_userPtr = userPtr;
118*35238bceSAndroid Build Coastguard Worker }
119*35238bceSAndroid Build Coastguard Worker
setState(CommLinkState state,const char * error)120*35238bceSAndroid Build Coastguard Worker void TcpIpLinkState::setState(CommLinkState state, const char *error)
121*35238bceSAndroid Build Coastguard Worker {
122*35238bceSAndroid Build Coastguard Worker CommLink::StateChangedFunc callback = DE_NULL;
123*35238bceSAndroid Build Coastguard Worker void *userPtr = DE_NULL;
124*35238bceSAndroid Build Coastguard Worker
125*35238bceSAndroid Build Coastguard Worker {
126*35238bceSAndroid Build Coastguard Worker de::ScopedLock lock(m_lock);
127*35238bceSAndroid Build Coastguard Worker
128*35238bceSAndroid Build Coastguard Worker m_state = state;
129*35238bceSAndroid Build Coastguard Worker m_error = error;
130*35238bceSAndroid Build Coastguard Worker
131*35238bceSAndroid Build Coastguard Worker callback = m_stateChangedCallback;
132*35238bceSAndroid Build Coastguard Worker userPtr = m_userPtr;
133*35238bceSAndroid Build Coastguard Worker }
134*35238bceSAndroid Build Coastguard Worker
135*35238bceSAndroid Build Coastguard Worker if (callback)
136*35238bceSAndroid Build Coastguard Worker callback(userPtr, state, error);
137*35238bceSAndroid Build Coastguard Worker }
138*35238bceSAndroid Build Coastguard Worker
onTestLogData(const uint8_t * bytes,size_t numBytes) const139*35238bceSAndroid Build Coastguard Worker void TcpIpLinkState::onTestLogData(const uint8_t *bytes, size_t numBytes) const
140*35238bceSAndroid Build Coastguard Worker {
141*35238bceSAndroid Build Coastguard Worker CommLink::LogDataFunc callback = DE_NULL;
142*35238bceSAndroid Build Coastguard Worker void *userPtr = DE_NULL;
143*35238bceSAndroid Build Coastguard Worker
144*35238bceSAndroid Build Coastguard Worker m_lock.lock();
145*35238bceSAndroid Build Coastguard Worker callback = m_testLogDataCallback;
146*35238bceSAndroid Build Coastguard Worker userPtr = m_userPtr;
147*35238bceSAndroid Build Coastguard Worker m_lock.unlock();
148*35238bceSAndroid Build Coastguard Worker
149*35238bceSAndroid Build Coastguard Worker if (callback)
150*35238bceSAndroid Build Coastguard Worker callback(userPtr, bytes, numBytes);
151*35238bceSAndroid Build Coastguard Worker }
152*35238bceSAndroid Build Coastguard Worker
onInfoLogData(const uint8_t * bytes,size_t numBytes) const153*35238bceSAndroid Build Coastguard Worker void TcpIpLinkState::onInfoLogData(const uint8_t *bytes, size_t numBytes) const
154*35238bceSAndroid Build Coastguard Worker {
155*35238bceSAndroid Build Coastguard Worker CommLink::LogDataFunc callback = DE_NULL;
156*35238bceSAndroid Build Coastguard Worker void *userPtr = DE_NULL;
157*35238bceSAndroid Build Coastguard Worker
158*35238bceSAndroid Build Coastguard Worker m_lock.lock();
159*35238bceSAndroid Build Coastguard Worker callback = m_infoLogDataCallback;
160*35238bceSAndroid Build Coastguard Worker userPtr = m_userPtr;
161*35238bceSAndroid Build Coastguard Worker m_lock.unlock();
162*35238bceSAndroid Build Coastguard Worker
163*35238bceSAndroid Build Coastguard Worker if (callback)
164*35238bceSAndroid Build Coastguard Worker callback(userPtr, bytes, numBytes);
165*35238bceSAndroid Build Coastguard Worker }
166*35238bceSAndroid Build Coastguard Worker
onKeepaliveReceived(void)167*35238bceSAndroid Build Coastguard Worker void TcpIpLinkState::onKeepaliveReceived(void)
168*35238bceSAndroid Build Coastguard Worker {
169*35238bceSAndroid Build Coastguard Worker de::ScopedLock lock(m_lock);
170*35238bceSAndroid Build Coastguard Worker m_lastKeepaliveReceived = deGetMicroseconds();
171*35238bceSAndroid Build Coastguard Worker }
172*35238bceSAndroid Build Coastguard Worker
getLastKeepaliveRecevied(void) const173*35238bceSAndroid Build Coastguard Worker uint64_t TcpIpLinkState::getLastKeepaliveRecevied(void) const
174*35238bceSAndroid Build Coastguard Worker {
175*35238bceSAndroid Build Coastguard Worker de::ScopedLock lock(m_lock);
176*35238bceSAndroid Build Coastguard Worker return m_lastKeepaliveReceived;
177*35238bceSAndroid Build Coastguard Worker }
178*35238bceSAndroid Build Coastguard Worker
179*35238bceSAndroid Build Coastguard Worker // TcpIpSendThread
180*35238bceSAndroid Build Coastguard Worker
TcpIpSendThread(de::Socket & socket,TcpIpLinkState & state)181*35238bceSAndroid Build Coastguard Worker TcpIpSendThread::TcpIpSendThread(de::Socket &socket, TcpIpLinkState &state)
182*35238bceSAndroid Build Coastguard Worker : m_socket(socket)
183*35238bceSAndroid Build Coastguard Worker , m_state(state)
184*35238bceSAndroid Build Coastguard Worker , m_buffer(SEND_BUFFER_BLOCK_SIZE, SEND_BUFFER_NUM_BLOCKS)
185*35238bceSAndroid Build Coastguard Worker , m_isRunning(false)
186*35238bceSAndroid Build Coastguard Worker {
187*35238bceSAndroid Build Coastguard Worker }
188*35238bceSAndroid Build Coastguard Worker
~TcpIpSendThread(void)189*35238bceSAndroid Build Coastguard Worker TcpIpSendThread::~TcpIpSendThread(void)
190*35238bceSAndroid Build Coastguard Worker {
191*35238bceSAndroid Build Coastguard Worker }
192*35238bceSAndroid Build Coastguard Worker
start(void)193*35238bceSAndroid Build Coastguard Worker void TcpIpSendThread::start(void)
194*35238bceSAndroid Build Coastguard Worker {
195*35238bceSAndroid Build Coastguard Worker DE_ASSERT(!m_isRunning);
196*35238bceSAndroid Build Coastguard Worker
197*35238bceSAndroid Build Coastguard Worker // Reset state.
198*35238bceSAndroid Build Coastguard Worker m_buffer.clear();
199*35238bceSAndroid Build Coastguard Worker m_isRunning = true;
200*35238bceSAndroid Build Coastguard Worker
201*35238bceSAndroid Build Coastguard Worker de::Thread::start();
202*35238bceSAndroid Build Coastguard Worker }
203*35238bceSAndroid Build Coastguard Worker
run(void)204*35238bceSAndroid Build Coastguard Worker void TcpIpSendThread::run(void)
205*35238bceSAndroid Build Coastguard Worker {
206*35238bceSAndroid Build Coastguard Worker try
207*35238bceSAndroid Build Coastguard Worker {
208*35238bceSAndroid Build Coastguard Worker uint8_t buf[SEND_BUFFER_BLOCK_SIZE];
209*35238bceSAndroid Build Coastguard Worker
210*35238bceSAndroid Build Coastguard Worker while (!m_buffer.isCanceled())
211*35238bceSAndroid Build Coastguard Worker {
212*35238bceSAndroid Build Coastguard Worker size_t numToSend = 0;
213*35238bceSAndroid Build Coastguard Worker size_t numSent = 0;
214*35238bceSAndroid Build Coastguard Worker deSocketResult result = DE_SOCKETRESULT_LAST;
215*35238bceSAndroid Build Coastguard Worker
216*35238bceSAndroid Build Coastguard Worker try
217*35238bceSAndroid Build Coastguard Worker {
218*35238bceSAndroid Build Coastguard Worker // Wait for single byte and then try to read more.
219*35238bceSAndroid Build Coastguard Worker m_buffer.read(1, &buf[0]);
220*35238bceSAndroid Build Coastguard Worker numToSend = 1 + m_buffer.tryRead(DE_LENGTH_OF_ARRAY(buf) - 1, &buf[1]);
221*35238bceSAndroid Build Coastguard Worker }
222*35238bceSAndroid Build Coastguard Worker catch (const de::BlockBuffer<uint8_t>::CanceledException &)
223*35238bceSAndroid Build Coastguard Worker {
224*35238bceSAndroid Build Coastguard Worker // Handled in loop condition.
225*35238bceSAndroid Build Coastguard Worker }
226*35238bceSAndroid Build Coastguard Worker
227*35238bceSAndroid Build Coastguard Worker while (numSent < numToSend)
228*35238bceSAndroid Build Coastguard Worker {
229*35238bceSAndroid Build Coastguard Worker result = m_socket.send(&buf[numSent], numToSend - numSent, &numSent);
230*35238bceSAndroid Build Coastguard Worker
231*35238bceSAndroid Build Coastguard Worker if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
232*35238bceSAndroid Build Coastguard Worker XE_FAIL("Connection closed");
233*35238bceSAndroid Build Coastguard Worker else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
234*35238bceSAndroid Build Coastguard Worker XE_FAIL("Connection terminated");
235*35238bceSAndroid Build Coastguard Worker else if (result == DE_SOCKETRESULT_ERROR)
236*35238bceSAndroid Build Coastguard Worker XE_FAIL("Socket error");
237*35238bceSAndroid Build Coastguard Worker else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
238*35238bceSAndroid Build Coastguard Worker {
239*35238bceSAndroid Build Coastguard Worker // \note Socket should not be in non-blocking mode.
240*35238bceSAndroid Build Coastguard Worker DE_ASSERT(numSent == 0);
241*35238bceSAndroid Build Coastguard Worker deYield();
242*35238bceSAndroid Build Coastguard Worker }
243*35238bceSAndroid Build Coastguard Worker else
244*35238bceSAndroid Build Coastguard Worker DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
245*35238bceSAndroid Build Coastguard Worker }
246*35238bceSAndroid Build Coastguard Worker }
247*35238bceSAndroid Build Coastguard Worker }
248*35238bceSAndroid Build Coastguard Worker catch (const std::exception &e)
249*35238bceSAndroid Build Coastguard Worker {
250*35238bceSAndroid Build Coastguard Worker m_state.setState(COMMLINKSTATE_ERROR, e.what());
251*35238bceSAndroid Build Coastguard Worker }
252*35238bceSAndroid Build Coastguard Worker }
253*35238bceSAndroid Build Coastguard Worker
stop(void)254*35238bceSAndroid Build Coastguard Worker void TcpIpSendThread::stop(void)
255*35238bceSAndroid Build Coastguard Worker {
256*35238bceSAndroid Build Coastguard Worker if (m_isRunning)
257*35238bceSAndroid Build Coastguard Worker {
258*35238bceSAndroid Build Coastguard Worker m_buffer.cancel();
259*35238bceSAndroid Build Coastguard Worker join();
260*35238bceSAndroid Build Coastguard Worker m_isRunning = false;
261*35238bceSAndroid Build Coastguard Worker }
262*35238bceSAndroid Build Coastguard Worker }
263*35238bceSAndroid Build Coastguard Worker
264*35238bceSAndroid Build Coastguard Worker // TcpIpRecvThread
265*35238bceSAndroid Build Coastguard Worker
TcpIpRecvThread(de::Socket & socket,TcpIpLinkState & state)266*35238bceSAndroid Build Coastguard Worker TcpIpRecvThread::TcpIpRecvThread(de::Socket &socket, TcpIpLinkState &state)
267*35238bceSAndroid Build Coastguard Worker : m_socket(socket)
268*35238bceSAndroid Build Coastguard Worker , m_state(state)
269*35238bceSAndroid Build Coastguard Worker , m_curMsgPos(0)
270*35238bceSAndroid Build Coastguard Worker , m_isRunning(false)
271*35238bceSAndroid Build Coastguard Worker {
272*35238bceSAndroid Build Coastguard Worker }
273*35238bceSAndroid Build Coastguard Worker
~TcpIpRecvThread(void)274*35238bceSAndroid Build Coastguard Worker TcpIpRecvThread::~TcpIpRecvThread(void)
275*35238bceSAndroid Build Coastguard Worker {
276*35238bceSAndroid Build Coastguard Worker }
277*35238bceSAndroid Build Coastguard Worker
start(void)278*35238bceSAndroid Build Coastguard Worker void TcpIpRecvThread::start(void)
279*35238bceSAndroid Build Coastguard Worker {
280*35238bceSAndroid Build Coastguard Worker DE_ASSERT(!m_isRunning);
281*35238bceSAndroid Build Coastguard Worker
282*35238bceSAndroid Build Coastguard Worker // Reset state.
283*35238bceSAndroid Build Coastguard Worker m_curMsgPos = 0;
284*35238bceSAndroid Build Coastguard Worker m_isRunning = true;
285*35238bceSAndroid Build Coastguard Worker
286*35238bceSAndroid Build Coastguard Worker de::Thread::start();
287*35238bceSAndroid Build Coastguard Worker }
288*35238bceSAndroid Build Coastguard Worker
run(void)289*35238bceSAndroid Build Coastguard Worker void TcpIpRecvThread::run(void)
290*35238bceSAndroid Build Coastguard Worker {
291*35238bceSAndroid Build Coastguard Worker try
292*35238bceSAndroid Build Coastguard Worker {
293*35238bceSAndroid Build Coastguard Worker for (;;)
294*35238bceSAndroid Build Coastguard Worker {
295*35238bceSAndroid Build Coastguard Worker bool hasHeader = m_curMsgPos >= xs::MESSAGE_HEADER_SIZE;
296*35238bceSAndroid Build Coastguard Worker bool hasPayload = false;
297*35238bceSAndroid Build Coastguard Worker size_t messageSize = 0;
298*35238bceSAndroid Build Coastguard Worker xs::MessageType messageType = (xs::MessageType)0;
299*35238bceSAndroid Build Coastguard Worker
300*35238bceSAndroid Build Coastguard Worker if (hasHeader)
301*35238bceSAndroid Build Coastguard Worker {
302*35238bceSAndroid Build Coastguard Worker xs::Message::parseHeader(&m_curMsgBuf[0], xs::MESSAGE_HEADER_SIZE, messageType, messageSize);
303*35238bceSAndroid Build Coastguard Worker hasPayload = m_curMsgPos >= messageSize;
304*35238bceSAndroid Build Coastguard Worker }
305*35238bceSAndroid Build Coastguard Worker
306*35238bceSAndroid Build Coastguard Worker if (hasPayload)
307*35238bceSAndroid Build Coastguard Worker {
308*35238bceSAndroid Build Coastguard Worker // Process message.
309*35238bceSAndroid Build Coastguard Worker handleMessage(messageType,
310*35238bceSAndroid Build Coastguard Worker m_curMsgPos > xs::MESSAGE_HEADER_SIZE ? &m_curMsgBuf[xs::MESSAGE_HEADER_SIZE] : DE_NULL,
311*35238bceSAndroid Build Coastguard Worker messageSize - xs::MESSAGE_HEADER_SIZE);
312*35238bceSAndroid Build Coastguard Worker m_curMsgPos = 0;
313*35238bceSAndroid Build Coastguard Worker }
314*35238bceSAndroid Build Coastguard Worker else
315*35238bceSAndroid Build Coastguard Worker {
316*35238bceSAndroid Build Coastguard Worker // Try to receive missing bytes.
317*35238bceSAndroid Build Coastguard Worker size_t curSize = hasHeader ? messageSize : (size_t)xs::MESSAGE_HEADER_SIZE;
318*35238bceSAndroid Build Coastguard Worker size_t bytesToRecv = curSize - m_curMsgPos;
319*35238bceSAndroid Build Coastguard Worker size_t numRecv = 0;
320*35238bceSAndroid Build Coastguard Worker deSocketResult result = DE_SOCKETRESULT_LAST;
321*35238bceSAndroid Build Coastguard Worker
322*35238bceSAndroid Build Coastguard Worker if (m_curMsgBuf.size() < curSize)
323*35238bceSAndroid Build Coastguard Worker m_curMsgBuf.resize(curSize);
324*35238bceSAndroid Build Coastguard Worker
325*35238bceSAndroid Build Coastguard Worker result = m_socket.receive(&m_curMsgBuf[m_curMsgPos], bytesToRecv, &numRecv);
326*35238bceSAndroid Build Coastguard Worker
327*35238bceSAndroid Build Coastguard Worker if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
328*35238bceSAndroid Build Coastguard Worker XE_FAIL("Connection closed");
329*35238bceSAndroid Build Coastguard Worker else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
330*35238bceSAndroid Build Coastguard Worker XE_FAIL("Connection terminated");
331*35238bceSAndroid Build Coastguard Worker else if (result == DE_SOCKETRESULT_ERROR)
332*35238bceSAndroid Build Coastguard Worker XE_FAIL("Socket error");
333*35238bceSAndroid Build Coastguard Worker else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
334*35238bceSAndroid Build Coastguard Worker {
335*35238bceSAndroid Build Coastguard Worker // \note Socket should not be in non-blocking mode.
336*35238bceSAndroid Build Coastguard Worker DE_ASSERT(numRecv == 0);
337*35238bceSAndroid Build Coastguard Worker deYield();
338*35238bceSAndroid Build Coastguard Worker }
339*35238bceSAndroid Build Coastguard Worker else
340*35238bceSAndroid Build Coastguard Worker {
341*35238bceSAndroid Build Coastguard Worker DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
342*35238bceSAndroid Build Coastguard Worker DE_ASSERT(numRecv <= bytesToRecv);
343*35238bceSAndroid Build Coastguard Worker m_curMsgPos += numRecv;
344*35238bceSAndroid Build Coastguard Worker // Continue receiving bytes / handle message in next iter.
345*35238bceSAndroid Build Coastguard Worker }
346*35238bceSAndroid Build Coastguard Worker }
347*35238bceSAndroid Build Coastguard Worker }
348*35238bceSAndroid Build Coastguard Worker }
349*35238bceSAndroid Build Coastguard Worker catch (const std::exception &e)
350*35238bceSAndroid Build Coastguard Worker {
351*35238bceSAndroid Build Coastguard Worker m_state.setState(COMMLINKSTATE_ERROR, e.what());
352*35238bceSAndroid Build Coastguard Worker }
353*35238bceSAndroid Build Coastguard Worker }
354*35238bceSAndroid Build Coastguard Worker
stop(void)355*35238bceSAndroid Build Coastguard Worker void TcpIpRecvThread::stop(void)
356*35238bceSAndroid Build Coastguard Worker {
357*35238bceSAndroid Build Coastguard Worker if (m_isRunning)
358*35238bceSAndroid Build Coastguard Worker {
359*35238bceSAndroid Build Coastguard Worker // \note Socket must be closed before terminating receive thread.
360*35238bceSAndroid Build Coastguard Worker XE_CHECK(!m_socket.isReceiveOpen());
361*35238bceSAndroid Build Coastguard Worker
362*35238bceSAndroid Build Coastguard Worker join();
363*35238bceSAndroid Build Coastguard Worker m_isRunning = false;
364*35238bceSAndroid Build Coastguard Worker }
365*35238bceSAndroid Build Coastguard Worker }
366*35238bceSAndroid Build Coastguard Worker
handleMessage(xs::MessageType messageType,const uint8_t * data,size_t dataSize)367*35238bceSAndroid Build Coastguard Worker void TcpIpRecvThread::handleMessage(xs::MessageType messageType, const uint8_t *data, size_t dataSize)
368*35238bceSAndroid Build Coastguard Worker {
369*35238bceSAndroid Build Coastguard Worker switch (messageType)
370*35238bceSAndroid Build Coastguard Worker {
371*35238bceSAndroid Build Coastguard Worker case xs::MESSAGETYPE_KEEPALIVE:
372*35238bceSAndroid Build Coastguard Worker m_state.onKeepaliveReceived();
373*35238bceSAndroid Build Coastguard Worker break;
374*35238bceSAndroid Build Coastguard Worker
375*35238bceSAndroid Build Coastguard Worker case xs::MESSAGETYPE_PROCESS_STARTED:
376*35238bceSAndroid Build Coastguard Worker XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_STARTED message");
377*35238bceSAndroid Build Coastguard Worker m_state.setState(COMMLINKSTATE_TEST_PROCESS_RUNNING);
378*35238bceSAndroid Build Coastguard Worker break;
379*35238bceSAndroid Build Coastguard Worker
380*35238bceSAndroid Build Coastguard Worker case xs::MESSAGETYPE_PROCESS_LAUNCH_FAILED:
381*35238bceSAndroid Build Coastguard Worker {
382*35238bceSAndroid Build Coastguard Worker xs::ProcessLaunchFailedMessage msg(data, dataSize);
383*35238bceSAndroid Build Coastguard Worker XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING,
384*35238bceSAndroid Build Coastguard Worker "Unexpected PROCESS_LAUNCH_FAILED message");
385*35238bceSAndroid Build Coastguard Worker m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCH_FAILED, msg.reason.c_str());
386*35238bceSAndroid Build Coastguard Worker break;
387*35238bceSAndroid Build Coastguard Worker }
388*35238bceSAndroid Build Coastguard Worker
389*35238bceSAndroid Build Coastguard Worker case xs::MESSAGETYPE_PROCESS_FINISHED:
390*35238bceSAndroid Build Coastguard Worker {
391*35238bceSAndroid Build Coastguard Worker XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_FINISHED message");
392*35238bceSAndroid Build Coastguard Worker xs::ProcessFinishedMessage msg(data, dataSize);
393*35238bceSAndroid Build Coastguard Worker m_state.setState(COMMLINKSTATE_TEST_PROCESS_FINISHED);
394*35238bceSAndroid Build Coastguard Worker DE_UNREF(msg); // \todo [2012-06-19 pyry] Report exit code.
395*35238bceSAndroid Build Coastguard Worker break;
396*35238bceSAndroid Build Coastguard Worker }
397*35238bceSAndroid Build Coastguard Worker
398*35238bceSAndroid Build Coastguard Worker case xs::MESSAGETYPE_PROCESS_LOG_DATA:
399*35238bceSAndroid Build Coastguard Worker case xs::MESSAGETYPE_INFO:
400*35238bceSAndroid Build Coastguard Worker // Ignore leading \0 if such is present. \todo [2012-06-19 pyry] Improve protocol.
401*35238bceSAndroid Build Coastguard Worker if (data[dataSize - 1] == 0)
402*35238bceSAndroid Build Coastguard Worker dataSize -= 1;
403*35238bceSAndroid Build Coastguard Worker
404*35238bceSAndroid Build Coastguard Worker if (messageType == xs::MESSAGETYPE_PROCESS_LOG_DATA)
405*35238bceSAndroid Build Coastguard Worker {
406*35238bceSAndroid Build Coastguard Worker XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING,
407*35238bceSAndroid Build Coastguard Worker "Unexpected PROCESS_LOG_DATA message");
408*35238bceSAndroid Build Coastguard Worker m_state.onTestLogData(&data[0], dataSize);
409*35238bceSAndroid Build Coastguard Worker }
410*35238bceSAndroid Build Coastguard Worker else
411*35238bceSAndroid Build Coastguard Worker m_state.onInfoLogData(&data[0], dataSize);
412*35238bceSAndroid Build Coastguard Worker break;
413*35238bceSAndroid Build Coastguard Worker
414*35238bceSAndroid Build Coastguard Worker default:
415*35238bceSAndroid Build Coastguard Worker XE_FAIL("Unknown message");
416*35238bceSAndroid Build Coastguard Worker }
417*35238bceSAndroid Build Coastguard Worker }
418*35238bceSAndroid Build Coastguard Worker
419*35238bceSAndroid Build Coastguard Worker // TcpIpLink
420*35238bceSAndroid Build Coastguard Worker
TcpIpLink(void)421*35238bceSAndroid Build Coastguard Worker TcpIpLink::TcpIpLink(void)
422*35238bceSAndroid Build Coastguard Worker : m_state(COMMLINKSTATE_ERROR, "Not connected")
423*35238bceSAndroid Build Coastguard Worker , m_sendThread(m_socket, m_state)
424*35238bceSAndroid Build Coastguard Worker , m_recvThread(m_socket, m_state)
425*35238bceSAndroid Build Coastguard Worker , m_keepaliveTimer(DE_NULL)
426*35238bceSAndroid Build Coastguard Worker {
427*35238bceSAndroid Build Coastguard Worker m_keepaliveTimer = deTimer_create(keepaliveTimerCallback, this);
428*35238bceSAndroid Build Coastguard Worker XE_CHECK(m_keepaliveTimer);
429*35238bceSAndroid Build Coastguard Worker }
430*35238bceSAndroid Build Coastguard Worker
~TcpIpLink(void)431*35238bceSAndroid Build Coastguard Worker TcpIpLink::~TcpIpLink(void)
432*35238bceSAndroid Build Coastguard Worker {
433*35238bceSAndroid Build Coastguard Worker try
434*35238bceSAndroid Build Coastguard Worker {
435*35238bceSAndroid Build Coastguard Worker closeConnection();
436*35238bceSAndroid Build Coastguard Worker }
437*35238bceSAndroid Build Coastguard Worker catch (...)
438*35238bceSAndroid Build Coastguard Worker {
439*35238bceSAndroid Build Coastguard Worker // Can't do much except to ignore error.
440*35238bceSAndroid Build Coastguard Worker }
441*35238bceSAndroid Build Coastguard Worker deTimer_destroy(m_keepaliveTimer);
442*35238bceSAndroid Build Coastguard Worker }
443*35238bceSAndroid Build Coastguard Worker
closeConnection(void)444*35238bceSAndroid Build Coastguard Worker void TcpIpLink::closeConnection(void)
445*35238bceSAndroid Build Coastguard Worker {
446*35238bceSAndroid Build Coastguard Worker {
447*35238bceSAndroid Build Coastguard Worker deSocketState state = m_socket.getState();
448*35238bceSAndroid Build Coastguard Worker if (state != DE_SOCKETSTATE_DISCONNECTED && state != DE_SOCKETSTATE_CLOSED)
449*35238bceSAndroid Build Coastguard Worker m_socket.shutdown();
450*35238bceSAndroid Build Coastguard Worker }
451*35238bceSAndroid Build Coastguard Worker
452*35238bceSAndroid Build Coastguard Worker if (deTimer_isActive(m_keepaliveTimer))
453*35238bceSAndroid Build Coastguard Worker deTimer_disable(m_keepaliveTimer);
454*35238bceSAndroid Build Coastguard Worker
455*35238bceSAndroid Build Coastguard Worker if (m_sendThread.isRunning())
456*35238bceSAndroid Build Coastguard Worker m_sendThread.stop();
457*35238bceSAndroid Build Coastguard Worker
458*35238bceSAndroid Build Coastguard Worker if (m_recvThread.isRunning())
459*35238bceSAndroid Build Coastguard Worker m_recvThread.stop();
460*35238bceSAndroid Build Coastguard Worker
461*35238bceSAndroid Build Coastguard Worker if (m_socket.getState() != DE_SOCKETSTATE_CLOSED)
462*35238bceSAndroid Build Coastguard Worker m_socket.close();
463*35238bceSAndroid Build Coastguard Worker }
464*35238bceSAndroid Build Coastguard Worker
connect(const de::SocketAddress & address)465*35238bceSAndroid Build Coastguard Worker void TcpIpLink::connect(const de::SocketAddress &address)
466*35238bceSAndroid Build Coastguard Worker {
467*35238bceSAndroid Build Coastguard Worker XE_CHECK(m_socket.getState() == DE_SOCKETSTATE_CLOSED);
468*35238bceSAndroid Build Coastguard Worker XE_CHECK(m_state.getState() == COMMLINKSTATE_ERROR);
469*35238bceSAndroid Build Coastguard Worker XE_CHECK(!m_sendThread.isRunning());
470*35238bceSAndroid Build Coastguard Worker XE_CHECK(!m_recvThread.isRunning());
471*35238bceSAndroid Build Coastguard Worker
472*35238bceSAndroid Build Coastguard Worker m_socket.connect(address);
473*35238bceSAndroid Build Coastguard Worker
474*35238bceSAndroid Build Coastguard Worker try
475*35238bceSAndroid Build Coastguard Worker {
476*35238bceSAndroid Build Coastguard Worker // Clear error and set state to ready.
477*35238bceSAndroid Build Coastguard Worker m_state.setState(COMMLINKSTATE_READY, "");
478*35238bceSAndroid Build Coastguard Worker m_state.onKeepaliveReceived();
479*35238bceSAndroid Build Coastguard Worker
480*35238bceSAndroid Build Coastguard Worker // Launch threads.
481*35238bceSAndroid Build Coastguard Worker m_sendThread.start();
482*35238bceSAndroid Build Coastguard Worker m_recvThread.start();
483*35238bceSAndroid Build Coastguard Worker
484*35238bceSAndroid Build Coastguard Worker XE_CHECK(deTimer_scheduleInterval(m_keepaliveTimer, xs::KEEPALIVE_SEND_INTERVAL));
485*35238bceSAndroid Build Coastguard Worker }
486*35238bceSAndroid Build Coastguard Worker catch (const std::exception &e)
487*35238bceSAndroid Build Coastguard Worker {
488*35238bceSAndroid Build Coastguard Worker closeConnection();
489*35238bceSAndroid Build Coastguard Worker m_state.setState(COMMLINKSTATE_ERROR, e.what());
490*35238bceSAndroid Build Coastguard Worker throw;
491*35238bceSAndroid Build Coastguard Worker }
492*35238bceSAndroid Build Coastguard Worker }
493*35238bceSAndroid Build Coastguard Worker
disconnect(void)494*35238bceSAndroid Build Coastguard Worker void TcpIpLink::disconnect(void)
495*35238bceSAndroid Build Coastguard Worker {
496*35238bceSAndroid Build Coastguard Worker try
497*35238bceSAndroid Build Coastguard Worker {
498*35238bceSAndroid Build Coastguard Worker closeConnection();
499*35238bceSAndroid Build Coastguard Worker m_state.setState(COMMLINKSTATE_ERROR, "Not connected");
500*35238bceSAndroid Build Coastguard Worker }
501*35238bceSAndroid Build Coastguard Worker catch (const std::exception &e)
502*35238bceSAndroid Build Coastguard Worker {
503*35238bceSAndroid Build Coastguard Worker m_state.setState(COMMLINKSTATE_ERROR, e.what());
504*35238bceSAndroid Build Coastguard Worker }
505*35238bceSAndroid Build Coastguard Worker }
506*35238bceSAndroid Build Coastguard Worker
reset(void)507*35238bceSAndroid Build Coastguard Worker void TcpIpLink::reset(void)
508*35238bceSAndroid Build Coastguard Worker {
509*35238bceSAndroid Build Coastguard Worker // \note Just clears error state if we are connected.
510*35238bceSAndroid Build Coastguard Worker if (m_socket.getState() == DE_SOCKETSTATE_CONNECTED)
511*35238bceSAndroid Build Coastguard Worker {
512*35238bceSAndroid Build Coastguard Worker m_state.setState(COMMLINKSTATE_READY, "");
513*35238bceSAndroid Build Coastguard Worker
514*35238bceSAndroid Build Coastguard Worker // \todo [2012-07-10 pyry] Do we need to reset send/receive buffers?
515*35238bceSAndroid Build Coastguard Worker }
516*35238bceSAndroid Build Coastguard Worker else
517*35238bceSAndroid Build Coastguard Worker disconnect(); // Abnormal state/usage. Disconnect socket.
518*35238bceSAndroid Build Coastguard Worker }
519*35238bceSAndroid Build Coastguard Worker
keepaliveTimerCallback(void * ptr)520*35238bceSAndroid Build Coastguard Worker void TcpIpLink::keepaliveTimerCallback(void *ptr)
521*35238bceSAndroid Build Coastguard Worker {
522*35238bceSAndroid Build Coastguard Worker TcpIpLink *link = static_cast<TcpIpLink *>(ptr);
523*35238bceSAndroid Build Coastguard Worker uint64_t lastKeepalive = link->m_state.getLastKeepaliveRecevied();
524*35238bceSAndroid Build Coastguard Worker uint64_t curTime = deGetMicroseconds();
525*35238bceSAndroid Build Coastguard Worker
526*35238bceSAndroid Build Coastguard Worker // Check for timeout.
527*35238bceSAndroid Build Coastguard Worker if ((int64_t)curTime - (int64_t)lastKeepalive > xs::KEEPALIVE_TIMEOUT * 1000)
528*35238bceSAndroid Build Coastguard Worker link->m_state.setState(COMMLINKSTATE_ERROR, "Keepalive timeout");
529*35238bceSAndroid Build Coastguard Worker
530*35238bceSAndroid Build Coastguard Worker // Enqueue new keepalive.
531*35238bceSAndroid Build Coastguard Worker try
532*35238bceSAndroid Build Coastguard Worker {
533*35238bceSAndroid Build Coastguard Worker writeKeepalive(link->m_sendThread.getBuffer());
534*35238bceSAndroid Build Coastguard Worker }
535*35238bceSAndroid Build Coastguard Worker catch (const de::BlockBuffer<uint8_t>::CanceledException &)
536*35238bceSAndroid Build Coastguard Worker {
537*35238bceSAndroid Build Coastguard Worker // Ignore. Can happen in connection teardown.
538*35238bceSAndroid Build Coastguard Worker }
539*35238bceSAndroid Build Coastguard Worker }
540*35238bceSAndroid Build Coastguard Worker
getState(void) const541*35238bceSAndroid Build Coastguard Worker CommLinkState TcpIpLink::getState(void) const
542*35238bceSAndroid Build Coastguard Worker {
543*35238bceSAndroid Build Coastguard Worker return m_state.getState();
544*35238bceSAndroid Build Coastguard Worker }
545*35238bceSAndroid Build Coastguard Worker
getState(std::string & message) const546*35238bceSAndroid Build Coastguard Worker CommLinkState TcpIpLink::getState(std::string &message) const
547*35238bceSAndroid Build Coastguard Worker {
548*35238bceSAndroid Build Coastguard Worker return m_state.getState(message);
549*35238bceSAndroid Build Coastguard Worker }
550*35238bceSAndroid Build Coastguard Worker
setCallbacks(StateChangedFunc stateChangedCallback,LogDataFunc testLogDataCallback,LogDataFunc infoLogDataCallback,void * userPtr)551*35238bceSAndroid Build Coastguard Worker void TcpIpLink::setCallbacks(StateChangedFunc stateChangedCallback, LogDataFunc testLogDataCallback,
552*35238bceSAndroid Build Coastguard Worker LogDataFunc infoLogDataCallback, void *userPtr)
553*35238bceSAndroid Build Coastguard Worker {
554*35238bceSAndroid Build Coastguard Worker m_state.setCallbacks(stateChangedCallback, testLogDataCallback, infoLogDataCallback, userPtr);
555*35238bceSAndroid Build Coastguard Worker }
556*35238bceSAndroid Build Coastguard Worker
startTestProcess(const char * name,const char * params,const char * workingDir,const char * caseList)557*35238bceSAndroid Build Coastguard Worker void TcpIpLink::startTestProcess(const char *name, const char *params, const char *workingDir, const char *caseList)
558*35238bceSAndroid Build Coastguard Worker {
559*35238bceSAndroid Build Coastguard Worker XE_CHECK(m_state.getState() == COMMLINKSTATE_READY);
560*35238bceSAndroid Build Coastguard Worker
561*35238bceSAndroid Build Coastguard Worker m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCHING);
562*35238bceSAndroid Build Coastguard Worker writeExecuteBinary(m_sendThread.getBuffer(), name, params, workingDir, caseList);
563*35238bceSAndroid Build Coastguard Worker }
564*35238bceSAndroid Build Coastguard Worker
stopTestProcess(void)565*35238bceSAndroid Build Coastguard Worker void TcpIpLink::stopTestProcess(void)
566*35238bceSAndroid Build Coastguard Worker {
567*35238bceSAndroid Build Coastguard Worker XE_CHECK(m_state.getState() != COMMLINKSTATE_ERROR);
568*35238bceSAndroid Build Coastguard Worker writeStopExecution(m_sendThread.getBuffer());
569*35238bceSAndroid Build Coastguard Worker }
570*35238bceSAndroid Build Coastguard Worker
571*35238bceSAndroid Build Coastguard Worker } // namespace xe
572