1 // Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6 #include <iomanip>
7
8 #include <boost/asio/write.hpp>
9
10 #include <vsomeip/constants.hpp>
11 #include <vsomeip/defines.hpp>
12 #include <vsomeip/internal/logger.hpp>
13
14 #include "../include/endpoint_host.hpp"
15 #include "../../routing/include/routing_host.hpp"
16 #include "../include/tcp_client_endpoint_impl.hpp"
17 #include "../../utility/include/utility.hpp"
18 #include "../../utility/include/byteorder.hpp"
19
20 namespace ip = boost::asio::ip;
21
22 namespace vsomeip_v3 {
23
tcp_client_endpoint_impl(const std::shared_ptr<endpoint_host> & _endpoint_host,const std::shared_ptr<routing_host> & _routing_host,const endpoint_type & _local,const endpoint_type & _remote,boost::asio::io_service & _io,const std::shared_ptr<configuration> & _configuration)24 tcp_client_endpoint_impl::tcp_client_endpoint_impl(
25 const std::shared_ptr<endpoint_host>& _endpoint_host,
26 const std::shared_ptr<routing_host>& _routing_host,
27 const endpoint_type& _local,
28 const endpoint_type& _remote,
29 boost::asio::io_service &_io,
30 const std::shared_ptr<configuration>& _configuration)
31 : tcp_client_endpoint_base_impl(_endpoint_host, _routing_host, _local,
32 _remote, _io,
33 _configuration->get_max_message_size_reliable(
34 _remote.address().to_string(),
35 _remote.port()),
36 _configuration->get_endpoint_queue_limit(
37 _remote.address().to_string(),
38 _remote.port()),
39 _configuration),
40 recv_buffer_size_initial_(VSOMEIP_SOMEIP_HEADER_SIZE),
41 recv_buffer_(std::make_shared<message_buffer_t>(recv_buffer_size_initial_, 0)),
42 shrink_count_(0),
43 buffer_shrink_threshold_(configuration_->get_buffer_shrink_threshold()),
44 remote_address_(_remote.address()),
45 remote_port_(_remote.port()),
46 last_cookie_sent_(std::chrono::steady_clock::now() - std::chrono::seconds(11)),
47 // send timeout after 2/3 of configured ttl, warning after 1/3
48 send_timeout_(configuration_->get_sd_ttl() * 666),
49 send_timeout_warning_(send_timeout_ / 2),
50 tcp_restart_aborts_max_(configuration_->get_max_tcp_restart_aborts()),
51 tcp_connect_time_max_(configuration_->get_max_tcp_connect_time()),
52 aborted_restart_count_(0),
53 is_sending_(false),
54 sent_timer_(_io) {
55
56 is_supporting_magic_cookies_ = true;
57 }
58
~tcp_client_endpoint_impl()59 tcp_client_endpoint_impl::~tcp_client_endpoint_impl() {
60 std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock();
61 if (its_host) {
62 its_host->release_port(local_.port(), true);
63 }
64 }
65
is_local() const66 bool tcp_client_endpoint_impl::is_local() const {
67 return false;
68 }
69
start()70 void tcp_client_endpoint_impl::start() {
71 strand_.dispatch(std::bind(&client_endpoint_impl::connect,
72 this->shared_from_this()));
73 }
74
restart(bool _force)75 void tcp_client_endpoint_impl::restart(bool _force) {
76 auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
77 auto restart_func = [self, _force] {
78 if (!_force && self->state_ == cei_state_e::CONNECTING) {
79 std::chrono::steady_clock::time_point its_current
80 = std::chrono::steady_clock::now();
81 long its_connect_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
82 its_current - self->connect_timepoint_).count();
83 if (self->aborted_restart_count_ < self->tcp_restart_aborts_max_
84 && its_connect_duration < self->tcp_connect_time_max_) {
85 self->aborted_restart_count_++;
86 return;
87 } else {
88 VSOMEIP_WARNING << "tce::restart: maximum number of aborted restarts ["
89 << self->tcp_restart_aborts_max_ << "] reached! its_connect_duration: "
90 << its_connect_duration;
91 }
92 }
93 self->state_ = cei_state_e::CONNECTING;
94 std::string address_port_local;
95 {
96 std::lock_guard<std::mutex> its_lock(self->socket_mutex_);
97 address_port_local = self->get_address_port_local();
98 self->shutdown_and_close_socket_unlocked(true);
99 self->recv_buffer_ = std::make_shared<message_buffer_t>(self->recv_buffer_size_initial_, 0);
100 }
101 self->was_not_connected_ = true;
102 self->reconnect_counter_ = 0;
103 {
104 std::lock_guard<std::mutex> its_lock(self->mutex_);
105 for (const auto&m : self->queue_) {
106 const service_t its_service = VSOMEIP_BYTES_TO_WORD(
107 (*m)[VSOMEIP_SERVICE_POS_MIN],
108 (*m)[VSOMEIP_SERVICE_POS_MAX]);
109 const method_t its_method = VSOMEIP_BYTES_TO_WORD(
110 (*m)[VSOMEIP_METHOD_POS_MIN],
111 (*m)[VSOMEIP_METHOD_POS_MAX]);
112 const client_t its_client = VSOMEIP_BYTES_TO_WORD(
113 (*m)[VSOMEIP_CLIENT_POS_MIN],
114 (*m)[VSOMEIP_CLIENT_POS_MAX]);
115 const session_t its_session = VSOMEIP_BYTES_TO_WORD(
116 (*m)[VSOMEIP_SESSION_POS_MIN],
117 (*m)[VSOMEIP_SESSION_POS_MAX]);
118 VSOMEIP_WARNING << "tce::restart: dropping message: "
119 << "remote:" << self->get_address_port_remote() << " ("
120 << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
121 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
122 << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
123 << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"
124 << " size: " << std::dec << m->size();
125 }
126 self->queue_.clear();
127 self->queue_size_ = 0;
128 }
129 VSOMEIP_WARNING << "tce::restart: local: " << address_port_local
130 << " remote: " << self->get_address_port_remote();
131 self->start_connect_timer();
132 };
133 // bind to strand_ to avoid socket closure if
134 // parallel socket operation is currently active
135 strand_.dispatch(restart_func);
136 }
137
connect()138 void tcp_client_endpoint_impl::connect() {
139 std::lock_guard<std::mutex> its_lock(socket_mutex_);
140 boost::system::error_code its_error;
141 socket_->open(remote_.protocol(), its_error);
142
143 if (!its_error || its_error == boost::asio::error::already_open) {
144 // Nagle algorithm off
145 socket_->set_option(ip::tcp::no_delay(true), its_error);
146 if (its_error) {
147 VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't disable "
148 << "Nagle algorithm: " << its_error.message()
149 << " remote:" << get_address_port_remote();
150 }
151
152 socket_->set_option(boost::asio::socket_base::keep_alive(true), its_error);
153 if (its_error) {
154 VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't enable "
155 << "keep_alive: " << its_error.message()
156 << " remote:" << get_address_port_remote();
157 }
158
159 // Enable SO_REUSEADDR to avoid bind problems with services going offline
160 // and coming online again and the user has specified only a small number
161 // of ports in the clients section for one service instance
162 socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error);
163 if (its_error) {
164 VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't enable "
165 << "SO_REUSEADDR: " << its_error.message()
166 << " remote:" << get_address_port_remote();
167 }
168 socket_->set_option(boost::asio::socket_base::linger(true, 0), its_error);
169 if (its_error) {
170 VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't enable "
171 << "SO_LINGER: " << its_error.message()
172 << " remote:" << get_address_port_remote();
173 }
174
175 #ifndef _WIN32
176 // If specified, bind to device
177 std::string its_device(configuration_->get_device());
178 if (its_device != "") {
179 if (setsockopt(socket_->native_handle(),
180 SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) {
181 VSOMEIP_WARNING << "TCP Client: Could not bind to device \"" << its_device << "\"";
182 }
183 }
184 #endif
185
186 // In case a client endpoint port was configured,
187 // bind to it before connecting
188 if (local_.port() != ILLEGAL_PORT) {
189 boost::system::error_code its_bind_error;
190 socket_->bind(local_, its_bind_error);
191 if(its_bind_error) {
192 VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
193 "Error binding socket: " << its_bind_error.message()
194 << " local: " << local_.address().to_string()
195 << ":" << std::dec << local_.port()
196 << " remote:" << get_address_port_remote();
197
198 std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock();
199 if (its_host) {
200 // set new client port depending on service / instance / remote port
201 if (!its_host->on_bind_error(shared_from_this(), remote_port_)) {
202 VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
203 "Failed to set new local port for tce: "
204 << " local: " << local_.address().to_string()
205 << ":" << std::dec << local_.port()
206 << " remote:" << get_address_port_remote();
207 } else {
208 local_.port(local_port_);
209 VSOMEIP_INFO << "tcp_client_endpoint::connect: "
210 "Using new new local port for tce: "
211 << " local: " << local_.address().to_string()
212 << ":" << std::dec << local_.port()
213 << " remote:" << get_address_port_remote();
214 }
215 }
216 try {
217 // don't connect on bind error to avoid using a random port
218 strand_.post(std::bind(&client_endpoint_impl::connect_cbk,
219 shared_from_this(), its_bind_error));
220 } catch (const std::exception &e) {
221 VSOMEIP_ERROR << "tcp_client_endpoint_impl::connect: "
222 << e.what()
223 << " local: " << local_.address().to_string()
224 << ":" << std::dec << local_.port()
225 << " remote:" << get_address_port_remote();
226 }
227 return;
228 }
229 return;
230 }
231
232 state_ = cei_state_e::CONNECTING;
233 connect_timepoint_ = std::chrono::steady_clock::now();
234 aborted_restart_count_ = 0;
235 socket_->async_connect(
236 remote_,
237 strand_.wrap(
238 std::bind(
239 &tcp_client_endpoint_base_impl::connect_cbk,
240 shared_from_this(),
241 std::placeholders::_1
242 )
243 )
244 );
245 } else {
246 VSOMEIP_WARNING << "tcp_client_endpoint::connect: Error opening socket: "
247 << its_error.message() << " remote:" << get_address_port_remote();
248 strand_.post(std::bind(&tcp_client_endpoint_base_impl::connect_cbk,
249 shared_from_this(), its_error));
250 }
251 }
252
receive()253 void tcp_client_endpoint_impl::receive() {
254 message_buffer_ptr_t its_recv_buffer;
255 {
256 std::lock_guard<std::mutex> its_lock(socket_mutex_);
257 its_recv_buffer = recv_buffer_;
258 }
259 auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
260 strand_.dispatch([self, &its_recv_buffer](){
261 self->receive(its_recv_buffer, 0, 0);
262 });
263 }
264
receive(message_buffer_ptr_t _recv_buffer,std::size_t _recv_buffer_size,std::size_t _missing_capacity)265 void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer,
266 std::size_t _recv_buffer_size,
267 std::size_t _missing_capacity) {
268 std::lock_guard<std::mutex> its_lock(socket_mutex_);
269 if(socket_->is_open()) {
270 const std::size_t its_capacity(_recv_buffer->capacity());
271 size_t buffer_size = its_capacity - _recv_buffer_size;
272 try {
273 if (_missing_capacity) {
274 if (_missing_capacity > MESSAGE_SIZE_UNLIMITED) {
275 VSOMEIP_ERROR << "Missing receive buffer capacity exceeds allowed maximum!";
276 return;
277 }
278 const std::size_t its_required_capacity(_recv_buffer_size + _missing_capacity);
279 if (its_capacity < its_required_capacity) {
280 _recv_buffer->reserve(its_required_capacity);
281 _recv_buffer->resize(its_required_capacity, 0x0);
282 if (_recv_buffer->size() > 1048576) {
283 VSOMEIP_INFO << "tce: recv_buffer size is: " <<
284 _recv_buffer->size()
285 << " local: " << get_address_port_local()
286 << " remote: " << get_address_port_remote();
287 }
288 }
289 buffer_size = _missing_capacity;
290 } else if (buffer_shrink_threshold_
291 && shrink_count_ > buffer_shrink_threshold_
292 && _recv_buffer_size == 0) {
293 _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
294 _recv_buffer->shrink_to_fit();
295 buffer_size = recv_buffer_size_initial_;
296 shrink_count_ = 0;
297 }
298 } catch (const std::exception &e) {
299 handle_recv_buffer_exception(e, _recv_buffer, _recv_buffer_size);
300 // don't start receiving again
301 return;
302 }
303 socket_->async_receive(
304 boost::asio::buffer(&(*_recv_buffer)[_recv_buffer_size], buffer_size),
305 strand_.wrap(
306 std::bind(
307 &tcp_client_endpoint_impl::receive_cbk,
308 std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()),
309 std::placeholders::_1,
310 std::placeholders::_2,
311 _recv_buffer,
312 _recv_buffer_size
313 )
314 )
315 );
316 }
317 }
318
send_queued(message_buffer_ptr_t _buffer)319 void tcp_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) {
320 const service_t its_service = VSOMEIP_BYTES_TO_WORD(
321 (*_buffer)[VSOMEIP_SERVICE_POS_MIN],
322 (*_buffer)[VSOMEIP_SERVICE_POS_MAX]);
323 const method_t its_method = VSOMEIP_BYTES_TO_WORD(
324 (*_buffer)[VSOMEIP_METHOD_POS_MIN],
325 (*_buffer)[VSOMEIP_METHOD_POS_MAX]);
326 const client_t its_client = VSOMEIP_BYTES_TO_WORD(
327 (*_buffer)[VSOMEIP_CLIENT_POS_MIN],
328 (*_buffer)[VSOMEIP_CLIENT_POS_MAX]);
329 const session_t its_session = VSOMEIP_BYTES_TO_WORD(
330 (*_buffer)[VSOMEIP_SESSION_POS_MIN],
331 (*_buffer)[VSOMEIP_SESSION_POS_MAX]);
332
333 if (has_enabled_magic_cookies_) {
334 const std::chrono::steady_clock::time_point now =
335 std::chrono::steady_clock::now();
336 if (std::chrono::duration_cast<std::chrono::milliseconds>(
337 now - last_cookie_sent_) > std::chrono::milliseconds(10000)) {
338 send_magic_cookie(_buffer);
339 last_cookie_sent_ = now;
340 }
341 }
342
343
344 #if 0
345 std::stringstream msg;
346 msg << "tcei<" << remote_.address() << ":"
347 << std::dec << remote_.port() << ">::sq: ";
348 for (std::size_t i = 0; i < _buffer->size(); i++)
349 msg << std::hex << std::setw(2) << std::setfill('0')
350 << (int)(*_buffer)[i] << " ";
351 VSOMEIP_INFO << msg.str();
352 #endif
353 {
354 std::lock_guard<std::mutex> its_lock(socket_mutex_);
355 if (socket_->is_open()) {
356 {
357 std::lock_guard<std::mutex> its_sent_lock(sent_mutex_);
358 is_sending_ = true;
359 }
360 boost::asio::async_write(
361 *socket_,
362 boost::asio::buffer(*_buffer),
363 std::bind(
364 &tcp_client_endpoint_impl::write_completion_condition,
365 std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()),
366 std::placeholders::_1,
367 std::placeholders::_2,
368 _buffer->size(),
369 its_service, its_method, its_client, its_session,
370 std::chrono::steady_clock::now()),
371 strand_.wrap(
372 std::bind(
373 &tcp_client_endpoint_base_impl::send_cbk,
374 shared_from_this(),
375 std::placeholders::_1,
376 std::placeholders::_2,
377 _buffer
378 ))
379 );
380 }
381 }
382 }
383
get_configured_times_from_endpoint(service_t _service,method_t _method,std::chrono::nanoseconds * _debouncing,std::chrono::nanoseconds * _maximum_retention) const384 void tcp_client_endpoint_impl::get_configured_times_from_endpoint(
385 service_t _service, method_t _method,
386 std::chrono::nanoseconds *_debouncing,
387 std::chrono::nanoseconds *_maximum_retention) const {
388 configuration_->get_configured_timing_requests(_service,
389 remote_address_.to_string(), remote_port_, _method,
390 _debouncing, _maximum_retention);
391 }
392
get_remote_address(boost::asio::ip::address & _address) const393 bool tcp_client_endpoint_impl::get_remote_address(
394 boost::asio::ip::address &_address) const {
395 if (remote_address_.is_unspecified()) {
396 return false;
397 }
398 _address = remote_address_;
399 return true;
400 }
401
set_local_port()402 void tcp_client_endpoint_impl::set_local_port() {
403 std::lock_guard<std::mutex> its_lock(socket_mutex_);
404 boost::system::error_code its_error;
405 if (socket_->is_open()) {
406 endpoint_type its_endpoint = socket_->local_endpoint(its_error);
407 if (!its_error) {
408 local_port_ = its_endpoint.port();
409 } else {
410 VSOMEIP_WARNING << "tcp_client_endpoint_impl::set_local_port() "
411 << " couldn't get local_endpoint: " << its_error.message();
412 }
413 }
414 }
415
write_completion_condition(const boost::system::error_code & _error,std::size_t _bytes_transferred,std::size_t _bytes_to_send,service_t _service,method_t _method,client_t _client,session_t _session,const std::chrono::steady_clock::time_point _start)416 std::size_t tcp_client_endpoint_impl::write_completion_condition(
417 const boost::system::error_code& _error, std::size_t _bytes_transferred,
418 std::size_t _bytes_to_send, service_t _service, method_t _method,
419 client_t _client, session_t _session,
420 const std::chrono::steady_clock::time_point _start) {
421
422 if (_error) {
423 VSOMEIP_ERROR << "tce::write_completion_condition: "
424 << _error.message() << "(" << std::dec << _error.value()
425 << ") bytes transferred: " << std::dec << _bytes_transferred
426 << " bytes to sent: " << std::dec << _bytes_to_send << " "
427 << "remote:" << get_address_port_remote() << " ("
428 << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
429 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
430 << std::hex << std::setw(4) << std::setfill('0') << _method << "."
431 << std::hex << std::setw(4) << std::setfill('0') << _session << "]";
432 return 0;
433 }
434
435 const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
436 const std::chrono::milliseconds passed = std::chrono::duration_cast<std::chrono::milliseconds>(now - _start);
437 if (passed > send_timeout_warning_) {
438 if (passed > send_timeout_) {
439 VSOMEIP_ERROR << "tce::write_completion_condition: "
440 << _error.message() << "(" << std::dec << _error.value()
441 << ") took longer than " << std::dec << send_timeout_.count()
442 << "ms bytes transferred: " << std::dec << _bytes_transferred
443 << " bytes to sent: " << std::dec << _bytes_to_send << " "
444 << "remote:" << get_address_port_remote() << " ("
445 << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
446 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
447 << std::hex << std::setw(4) << std::setfill('0') << _method << "."
448 << std::hex << std::setw(4) << std::setfill('0') << _session << "]";
449 } else {
450 VSOMEIP_WARNING << "tce::write_completion_condition: "
451 << _error.message() << "(" << std::dec << _error.value()
452 << ") took longer than " << std::dec << send_timeout_warning_.count()
453 << "ms bytes transferred: " << std::dec << _bytes_transferred
454 << " bytes to sent: " << std::dec << _bytes_to_send << " "
455 << "remote:" << get_address_port_remote() << " ("
456 << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
457 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
458 << std::hex << std::setw(4) << std::setfill('0') << _method << "."
459 << std::hex << std::setw(4) << std::setfill('0') << _session << "]";
460 }
461 }
462 return _bytes_to_send - _bytes_transferred;
463 }
464
get_remote_port() const465 std::uint16_t tcp_client_endpoint_impl::get_remote_port() const {
466 return remote_port_;
467 }
468
is_reliable() const469 bool tcp_client_endpoint_impl::is_reliable() const {
470 return true;
471 }
472
is_magic_cookie(const message_buffer_ptr_t & _recv_buffer,size_t _offset) const473 bool tcp_client_endpoint_impl::is_magic_cookie(const message_buffer_ptr_t& _recv_buffer,
474 size_t _offset) const {
475 return (0 == std::memcmp(SERVICE_COOKIE, &(*_recv_buffer)[_offset], sizeof(SERVICE_COOKIE)));
476 }
477
send_magic_cookie(message_buffer_ptr_t & _buffer)478 void tcp_client_endpoint_impl::send_magic_cookie(message_buffer_ptr_t &_buffer) {
479 if (max_message_size_ == MESSAGE_SIZE_UNLIMITED
480 || max_message_size_ - _buffer->size() >=
481 VSOMEIP_SOMEIP_HEADER_SIZE + VSOMEIP_SOMEIP_MAGIC_COOKIE_SIZE) {
482 _buffer->insert(
483 _buffer->begin(),
484 CLIENT_COOKIE,
485 CLIENT_COOKIE + sizeof(CLIENT_COOKIE)
486 );
487 queue_size_ += sizeof(CLIENT_COOKIE);
488 } else {
489 VSOMEIP_WARNING << "Packet full. Cannot insert magic cookie!";
490 }
491 }
492
receive_cbk(boost::system::error_code const & _error,std::size_t _bytes,const message_buffer_ptr_t & _recv_buffer,std::size_t _recv_buffer_size)493 void tcp_client_endpoint_impl::receive_cbk(
494 boost::system::error_code const &_error, std::size_t _bytes,
495 const message_buffer_ptr_t& _recv_buffer, std::size_t _recv_buffer_size) {
496 if (_error == boost::asio::error::operation_aborted) {
497 // endpoint was stopped
498 return;
499 }
500 #if 0
501 std::stringstream msg;
502 msg << "cei::rcb (" << _error.message() << "): ";
503 for (std::size_t i = 0; i < _bytes + _recv_buffer_size; ++i)
504 msg << std::hex << std::setw(2) << std::setfill('0')
505 << (int) (_recv_buffer)[i] << " ";
506 VSOMEIP_INFO << msg.str();
507 #endif
508 std::unique_lock<std::mutex> its_lock(socket_mutex_);
509 std::shared_ptr<routing_host> its_host = routing_host_.lock();
510 if (its_host) {
511 std::uint32_t its_missing_capacity(0);
512 if (!_error && 0 < _bytes) {
513 if (_recv_buffer_size + _bytes < _recv_buffer_size) {
514 VSOMEIP_ERROR << "receive buffer overflow in tcp client endpoint ~> abort!";
515 return;
516 }
517 _recv_buffer_size += _bytes;
518
519 size_t its_iteration_gap = 0;
520 bool has_full_message(false);
521 do {
522 uint64_t read_message_size
523 = utility::get_message_size(&(*_recv_buffer)[its_iteration_gap],
524 _recv_buffer_size);
525 if (read_message_size > MESSAGE_SIZE_UNLIMITED) {
526 VSOMEIP_ERROR << "Message size exceeds allowed maximum!";
527 return;
528 }
529 uint32_t current_message_size = static_cast<uint32_t>(read_message_size);
530 has_full_message = (current_message_size > VSOMEIP_RETURN_CODE_POS
531 && current_message_size <= _recv_buffer_size);
532 if (has_full_message) {
533 bool needs_forwarding(true);
534 if (is_magic_cookie(_recv_buffer, its_iteration_gap)) {
535 has_enabled_magic_cookies_ = true;
536 } else {
537 if (has_enabled_magic_cookies_) {
538 uint32_t its_offset = find_magic_cookie(&(*_recv_buffer)[its_iteration_gap],
539 (uint32_t) _recv_buffer_size);
540 if (its_offset < current_message_size) {
541 VSOMEIP_ERROR << "Message includes Magic Cookie. Ignoring it.";
542 current_message_size = its_offset;
543 needs_forwarding = false;
544 }
545 }
546 }
547 if (needs_forwarding) {
548 if (!has_enabled_magic_cookies_) {
549 its_host->on_message(&(*_recv_buffer)[its_iteration_gap],
550 current_message_size, this,
551 boost::asio::ip::address(),
552 VSOMEIP_ROUTING_CLIENT,
553 std::make_pair(ANY_UID, ANY_GID),
554 remote_address_,
555 remote_port_);
556 } else {
557 // Only call on_message without a magic cookie in front of the buffer!
558 if (!is_magic_cookie(_recv_buffer, its_iteration_gap)) {
559 its_host->on_message(&(*_recv_buffer)[its_iteration_gap],
560 current_message_size, this,
561 boost::asio::ip::address(),
562 VSOMEIP_ROUTING_CLIENT,
563 std::make_pair(ANY_UID, ANY_GID),
564 remote_address_,
565 remote_port_);
566 }
567 }
568 }
569 calculate_shrink_count(_recv_buffer, _recv_buffer_size);
570 _recv_buffer_size -= current_message_size;
571 its_iteration_gap += current_message_size;
572 its_missing_capacity = 0;
573 } else if (has_enabled_magic_cookies_ && _recv_buffer_size > 0) {
574 const uint32_t its_offset = find_magic_cookie(
575 &(*_recv_buffer)[its_iteration_gap], _recv_buffer_size);
576 if (its_offset < _recv_buffer_size) {
577 _recv_buffer_size -= its_offset;
578 its_iteration_gap += its_offset;
579 has_full_message = true; // trigger next loop
580 VSOMEIP_ERROR << "Detected Magic Cookie within message data."
581 << " Resyncing. local: " << get_address_port_local()
582 << " remote: " << get_address_port_remote();
583 }
584 }
585
586 if (!has_full_message) {
587 if (_recv_buffer_size > VSOMEIP_RETURN_CODE_POS &&
588 ((*recv_buffer_)[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION ||
589 !utility::is_valid_message_type(static_cast<message_type_e>((*recv_buffer_)[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS])) ||
590 !utility::is_valid_return_code(static_cast<return_code_e>((*recv_buffer_)[its_iteration_gap + VSOMEIP_RETURN_CODE_POS]))
591 )) {
592 if ((*recv_buffer_)[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) {
593 VSOMEIP_ERROR << "tce: Wrong protocol version: 0x"
594 << std::hex << std::setw(2) << std::setfill('0')
595 << std::uint32_t((*recv_buffer_)[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS])
596 << " local: " << get_address_port_local()
597 << " remote: " << get_address_port_remote();
598 // ensure to send back a message w/ wrong protocol version
599 its_lock.unlock();
600 its_host->on_message(&(*_recv_buffer)[its_iteration_gap],
601 VSOMEIP_SOMEIP_HEADER_SIZE + 8, this,
602 boost::asio::ip::address(),
603 VSOMEIP_ROUTING_CLIENT,
604 std::make_pair(ANY_UID, ANY_GID),
605 remote_address_,
606 remote_port_);
607 its_lock.lock();
608 } else if (!utility::is_valid_message_type(static_cast<message_type_e>(
609 (*recv_buffer_)[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS]))) {
610 VSOMEIP_ERROR << "tce: Invalid message type: 0x"
611 << std::hex << std::setw(2) << std::setfill('0')
612 << std::uint32_t((*recv_buffer_)[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS])
613 << " local: " << get_address_port_local()
614 << " remote: " << get_address_port_remote();
615 } else if (!utility::is_valid_return_code(static_cast<return_code_e>(
616 (*recv_buffer_)[its_iteration_gap + VSOMEIP_RETURN_CODE_POS]))) {
617 VSOMEIP_ERROR << "tce: Invalid return code: 0x"
618 << std::hex << std::setw(2) << std::setfill('0')
619 << std::uint32_t((*recv_buffer_)[its_iteration_gap + VSOMEIP_RETURN_CODE_POS])
620 << " local: " << get_address_port_local()
621 << " remote: " << get_address_port_remote();
622 }
623 state_ = cei_state_e::CONNECTING;
624 shutdown_and_close_socket_unlocked(false);
625 its_lock.unlock();
626
627 // wait_until_sent interprets "no error" as timeout.
628 // Therefore call it with an error.
629 wait_until_sent(boost::asio::error::operation_aborted);
630 return;
631 } else if (max_message_size_ != MESSAGE_SIZE_UNLIMITED &&
632 current_message_size > max_message_size_) {
633 _recv_buffer_size = 0;
634 _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
635 _recv_buffer->shrink_to_fit();
636 if (has_enabled_magic_cookies_) {
637 VSOMEIP_ERROR << "Received a TCP message which exceeds "
638 << "maximum message size ("
639 << std::dec << current_message_size
640 << "). Magic Cookies are enabled: "
641 << "Resetting receiver. local: "
642 << get_address_port_local() << " remote: "
643 << get_address_port_remote();
644 } else {
645 VSOMEIP_ERROR << "Received a TCP message which exceeds "
646 << "maximum message size ("
647 << std::dec << current_message_size
648 << ") Magic cookies are disabled, "
649 << "Restarting connection. "
650 << "local: " << get_address_port_local()
651 << " remote: " << get_address_port_remote();
652 state_ = cei_state_e::CONNECTING;
653 shutdown_and_close_socket_unlocked(false);
654 its_lock.unlock();
655
656 // wait_until_sent interprets "no error" as timeout.
657 // Therefore call it with an error.
658 wait_until_sent(boost::asio::error::operation_aborted);
659 return;
660 }
661 } else if (current_message_size > _recv_buffer_size) {
662 its_missing_capacity = current_message_size
663 - static_cast<std::uint32_t>(_recv_buffer_size);
664 } else if (VSOMEIP_SOMEIP_HEADER_SIZE > _recv_buffer_size) {
665 its_missing_capacity = VSOMEIP_SOMEIP_HEADER_SIZE
666 - static_cast<std::uint32_t>(_recv_buffer_size);
667 } else if (has_enabled_magic_cookies_ && _recv_buffer_size > 0) {
668 // no need to check for magic cookie here again: has_full_message
669 // would have been set to true if there was one present in the data
670 _recv_buffer_size = 0;
671 _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
672 _recv_buffer->shrink_to_fit();
673 its_missing_capacity = 0;
674 VSOMEIP_ERROR << "tce::c<" << this
675 << ">rcb: recv_buffer_capacity: "
676 << _recv_buffer->capacity()
677 << " local: " << get_address_port_local()
678 << " remote: " << get_address_port_remote()
679 << ". Didn't find magic cookie in broken data, trying to resync.";
680 } else {
681 VSOMEIP_ERROR << "tce::c<" << this
682 << ">rcb: recv_buffer_size is: " << std::dec
683 << _recv_buffer_size << " but couldn't read "
684 "out message_size. recv_buffer_capacity: "
685 << _recv_buffer->capacity()
686 << " its_iteration_gap: " << its_iteration_gap
687 << " local: " << get_address_port_local()
688 << " remote: " << get_address_port_remote()
689 << ". Restarting connection due to missing/broken data TCP stream.";
690 state_ = cei_state_e::CONNECTING;
691 shutdown_and_close_socket_unlocked(false);
692 its_lock.unlock();
693
694 // wait_until_sent interprets "no error" as timeout.
695 // Therefore call it with an error.
696 wait_until_sent(boost::asio::error::operation_aborted);
697 return;
698 }
699 }
700 } while (has_full_message && _recv_buffer_size);
701 if (its_iteration_gap) {
702 // Copy incomplete message to front for next receive_cbk iteration
703 for (size_t i = 0; i < _recv_buffer_size; ++i) {
704 (*_recv_buffer)[i] = (*_recv_buffer)[i + its_iteration_gap];
705 }
706 // Still more capacity needed after shifting everything to front?
707 if (its_missing_capacity &&
708 its_missing_capacity <= _recv_buffer->capacity() - _recv_buffer_size) {
709 its_missing_capacity = 0;
710 }
711 }
712 its_lock.unlock();
713 auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
714 strand_.dispatch([self, &_recv_buffer, _recv_buffer_size, its_missing_capacity](){
715 self->receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
716 });
717 } else {
718 VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
719 << _error.message() << "(" << std::dec << _error.value()
720 << ") local: " << get_address_port_local()
721 << " remote: " << get_address_port_remote();
722 if (_error == boost::asio::error::eof ||
723 _error == boost::asio::error::timed_out ||
724 _error == boost::asio::error::bad_descriptor ||
725 _error == boost::asio::error::connection_reset) {
726 if (state_ == cei_state_e::CONNECTING) {
727 VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk already"
728 " restarting" << get_remote_information();
729 } else {
730 VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk restarting.";
731 state_ = cei_state_e::CONNECTING;
732 shutdown_and_close_socket_unlocked(false);
733 its_lock.unlock();
734
735 // wait_until_sent interprets "no error" as timeout.
736 // Therefore call it with an error.
737 wait_until_sent(boost::asio::error::operation_aborted);
738 }
739 } else {
740 its_lock.unlock();
741 auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
742 strand_.dispatch([self, &_recv_buffer, _recv_buffer_size, its_missing_capacity](){
743 self->receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
744 });
745 }
746 }
747 }
748 }
749
calculate_shrink_count(const message_buffer_ptr_t & _recv_buffer,std::size_t _recv_buffer_size)750 void tcp_client_endpoint_impl::calculate_shrink_count(const message_buffer_ptr_t& _recv_buffer,
751 std::size_t _recv_buffer_size) {
752 if (buffer_shrink_threshold_) {
753 if (_recv_buffer->capacity() != recv_buffer_size_initial_) {
754 if (_recv_buffer_size < (_recv_buffer->capacity() >> 1)) {
755 shrink_count_++;
756 } else {
757 shrink_count_ = 0;
758 }
759 }
760 }
761 }
762
763
get_address_port_remote() const764 const std::string tcp_client_endpoint_impl::get_address_port_remote() const {
765 boost::system::error_code ec;
766 std::string its_address_port;
767 its_address_port.reserve(21);
768 boost::asio::ip::address its_address;
769 if (get_remote_address(its_address)) {
770 its_address_port += its_address.to_string();
771 }
772 its_address_port += ":";
773 its_address_port += std::to_string(remote_port_);
774 return its_address_port;
775 }
776
get_address_port_local() const777 const std::string tcp_client_endpoint_impl::get_address_port_local() const {
778 std::string its_address_port;
779 its_address_port.reserve(21);
780 boost::system::error_code ec;
781 if (socket_->is_open()) {
782 endpoint_type its_local_endpoint = socket_->local_endpoint(ec);
783 if (!ec) {
784 its_address_port += its_local_endpoint.address().to_string(ec);
785 its_address_port += ":";
786 its_address_port.append(std::to_string(its_local_endpoint.port()));
787 }
788 }
789 return its_address_port;
790 }
791
handle_recv_buffer_exception(const std::exception & _e,const message_buffer_ptr_t & _recv_buffer,std::size_t _recv_buffer_size)792 void tcp_client_endpoint_impl::handle_recv_buffer_exception(
793 const std::exception &_e,
794 const message_buffer_ptr_t& _recv_buffer,
795 std::size_t _recv_buffer_size) {
796 boost::system::error_code ec;
797
798 std::stringstream its_message;
799 its_message <<"tcp_client_endpoint_impl::connection catched exception"
800 << _e.what() << " local: " << get_address_port_local()
801 << " remote: " << get_address_port_remote()
802 << " shutting down connection. Start of buffer: ";
803
804 for (std::size_t i = 0; i < _recv_buffer_size && i < 16; i++) {
805 its_message << std::setw(2) << std::setfill('0') << std::hex
806 << (int) ((*_recv_buffer)[i]) << " ";
807 }
808
809 its_message << " Last 16 Bytes captured: ";
810 for (int i = 15; _recv_buffer_size > 15 && i >= 0; i--) {
811 its_message << std::setw(2) << std::setfill('0') << std::hex
812 << (int) ((*_recv_buffer)[static_cast<size_t>(i)]) << " ";
813 }
814 VSOMEIP_ERROR << its_message.str();
815 _recv_buffer->clear();
816 {
817 std::lock_guard<std::mutex> its_lock(mutex_);
818 sending_blocked_ = true;
819 }
820 {
821 std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
822 boost::system::error_code ec;
823 connect_timer_.cancel(ec);
824 }
825 if (socket_->is_open()) {
826 boost::system::error_code its_error;
827 socket_->shutdown(socket_type::shutdown_both, its_error);
828 socket_->close(its_error);
829 }
830 }
831
print_status()832 void tcp_client_endpoint_impl::print_status() {
833 std::size_t its_data_size(0);
834 std::size_t its_queue_size(0);
835 std::size_t its_receive_buffer_capacity(0);
836 {
837 std::lock_guard<std::mutex> its_lock(mutex_);
838 its_queue_size = queue_.size();
839 its_data_size = queue_size_;
840 }
841 std::string local;
842 {
843 std::lock_guard<std::mutex> its_lock(socket_mutex_);
844 local = get_address_port_local();
845 its_receive_buffer_capacity = recv_buffer_->capacity();
846 }
847
848 VSOMEIP_INFO << "status tce: " << local << " -> "
849 << get_address_port_remote()
850 << " queue: " << std::dec << its_queue_size
851 << " data: " << std::dec << its_data_size
852 << " recv_buffer: " << std::dec << its_receive_buffer_capacity;
853 }
854
get_remote_information() const855 std::string tcp_client_endpoint_impl::get_remote_information() const {
856 boost::system::error_code ec;
857 return remote_.address().to_string(ec) + ":"
858 + std::to_string(remote_.port());
859 }
860
send_cbk(boost::system::error_code const & _error,std::size_t _bytes,const message_buffer_ptr_t & _sent_msg)861 void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error,
862 std::size_t _bytes,
863 const message_buffer_ptr_t& _sent_msg) {
864 (void)_bytes;
865
866 {
867 // Signal that the current send operation has finished.
868 // Note: Waiting is always done after having closed the socket.
869 // Therefore, no new send operation will be scheduled.
870 std::lock_guard<std::mutex> its_sent_lock(sent_mutex_);
871 is_sending_ = false;
872
873 boost::system::error_code ec;
874 sent_timer_.cancel(ec);
875 }
876
877 if (!_error) {
878 std::lock_guard<std::mutex> its_lock(mutex_);
879 if (queue_.size() > 0) {
880 queue_size_ -= queue_.front()->size();
881 queue_.pop_front();
882 auto its_buffer = get_front();
883 if (its_buffer) {
884 auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
885 strand_.dispatch(
886 [self, its_buffer]() { self->send_queued(its_buffer);}
887 );
888 }
889 }
890 } else if (_error == boost::system::errc::destination_address_required) {
891 VSOMEIP_WARNING << "tce::send_cbk received error: " << _error.message()
892 << " (" << std::dec << _error.value() << ") "
893 << get_remote_information();
894 was_not_connected_ = true;
895 } else if (_error == boost::asio::error::operation_aborted) {
896 // endpoint was stopped
897 shutdown_and_close_socket(false);
898 } else {
899 if (state_ == cei_state_e::CONNECTING) {
900 VSOMEIP_WARNING << "tce::send_cbk endpoint is already restarting:"
901 << get_remote_information();
902 } else {
903 state_ = cei_state_e::CONNECTING;
904 shutdown_and_close_socket(false);
905 std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock();
906 if (its_host) {
907 its_host->on_disconnect(shared_from_this());
908 }
909 restart(true);
910 }
911 service_t its_service(0);
912 method_t its_method(0);
913 client_t its_client(0);
914 session_t its_session(0);
915 if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
916 its_service = VSOMEIP_BYTES_TO_WORD(
917 (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
918 (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
919 its_method = VSOMEIP_BYTES_TO_WORD(
920 (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
921 (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
922 its_client = VSOMEIP_BYTES_TO_WORD(
923 (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
924 (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
925 its_session = VSOMEIP_BYTES_TO_WORD(
926 (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
927 (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
928 }
929 VSOMEIP_WARNING << "tce::send_cbk received error: "
930 << _error.message() << " (" << std::dec
931 << _error.value() << ") " << get_remote_information()
932 << " " << std::dec << queue_.size()
933 << " " << std::dec << queue_size_ << " ("
934 << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
935 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
936 << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
937 << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
938 }
939 }
940
tp_segmentation_enabled(service_t _service,method_t _method) const941 bool tcp_client_endpoint_impl::tp_segmentation_enabled(service_t _service,
942 method_t _method) const {
943 (void)_service;
944 (void)_method;
945 return false;
946 }
947
get_max_allowed_reconnects() const948 std::uint32_t tcp_client_endpoint_impl::get_max_allowed_reconnects() const {
949 return MAX_RECONNECTS_UNLIMITED;
950 }
951
max_allowed_reconnects_reached()952 void tcp_client_endpoint_impl::max_allowed_reconnects_reached() {
953 return;
954 }
955
wait_until_sent(const boost::system::error_code & _error)956 void tcp_client_endpoint_impl::wait_until_sent(const boost::system::error_code &_error) {
957
958 std::unique_lock<std::mutex> its_sent_lock(sent_mutex_);
959 if (!is_sending_ || !_error) {
960 its_sent_lock.unlock();
961 if (!_error)
962 VSOMEIP_WARNING << __func__
963 << ": Maximum wait time for send operation exceeded for tce.";
964
965 std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock();
966 its_ep_host->on_disconnect(shared_from_this());
967 restart(true);
968 } else {
969 std::chrono::milliseconds its_timeout(VSOMEIP_MAX_TCP_SENT_WAIT_TIME);
970 boost::system::error_code ec;
971 sent_timer_.expires_from_now(its_timeout, ec);
972 sent_timer_.async_wait(std::bind(&tcp_client_endpoint_impl::wait_until_sent,
973 std::dynamic_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()),
974 std::placeholders::_1));
975 }
976 }
977
978 } // namespace vsomeip_v3
979