1 // Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 
6 #include <iomanip>
7 
8 #include <boost/asio/write.hpp>
9 
10 #include <vsomeip/constants.hpp>
11 #include <vsomeip/defines.hpp>
12 #include <vsomeip/internal/logger.hpp>
13 
14 #include "../include/endpoint_host.hpp"
15 #include "../../routing/include/routing_host.hpp"
16 #include "../include/tcp_client_endpoint_impl.hpp"
17 #include "../../utility/include/utility.hpp"
18 #include "../../utility/include/byteorder.hpp"
19 
20 namespace ip = boost::asio::ip;
21 
22 namespace vsomeip_v3 {
23 
tcp_client_endpoint_impl(const std::shared_ptr<endpoint_host> & _endpoint_host,const std::shared_ptr<routing_host> & _routing_host,const endpoint_type & _local,const endpoint_type & _remote,boost::asio::io_service & _io,const std::shared_ptr<configuration> & _configuration)24 tcp_client_endpoint_impl::tcp_client_endpoint_impl(
25         const std::shared_ptr<endpoint_host>& _endpoint_host,
26         const std::shared_ptr<routing_host>& _routing_host,
27         const endpoint_type& _local,
28         const endpoint_type& _remote,
29         boost::asio::io_service &_io,
30         const std::shared_ptr<configuration>& _configuration)
31     : tcp_client_endpoint_base_impl(_endpoint_host, _routing_host, _local,
32                                     _remote, _io,
33                                     _configuration->get_max_message_size_reliable(
34                                             _remote.address().to_string(),
35                                             _remote.port()),
36                                     _configuration->get_endpoint_queue_limit(
37                                                     _remote.address().to_string(),
38                                                     _remote.port()),
39                                     _configuration),
40       recv_buffer_size_initial_(VSOMEIP_SOMEIP_HEADER_SIZE),
41       recv_buffer_(std::make_shared<message_buffer_t>(recv_buffer_size_initial_, 0)),
42       shrink_count_(0),
43       buffer_shrink_threshold_(configuration_->get_buffer_shrink_threshold()),
44       remote_address_(_remote.address()),
45       remote_port_(_remote.port()),
46       last_cookie_sent_(std::chrono::steady_clock::now() - std::chrono::seconds(11)),
47       // send timeout after 2/3 of configured ttl, warning after 1/3
48       send_timeout_(configuration_->get_sd_ttl() * 666),
49       send_timeout_warning_(send_timeout_ / 2),
50       tcp_restart_aborts_max_(configuration_->get_max_tcp_restart_aborts()),
51       tcp_connect_time_max_(configuration_->get_max_tcp_connect_time()),
52       aborted_restart_count_(0),
53       is_sending_(false),
54       sent_timer_(_io) {
55 
56     is_supporting_magic_cookies_ = true;
57 }
58 
~tcp_client_endpoint_impl()59 tcp_client_endpoint_impl::~tcp_client_endpoint_impl() {
60     std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock();
61     if (its_host) {
62         its_host->release_port(local_.port(), true);
63     }
64 }
65 
is_local() const66 bool tcp_client_endpoint_impl::is_local() const {
67     return false;
68 }
69 
start()70 void tcp_client_endpoint_impl::start() {
71     strand_.dispatch(std::bind(&client_endpoint_impl::connect,
72             this->shared_from_this()));
73 }
74 
restart(bool _force)75 void tcp_client_endpoint_impl::restart(bool _force) {
76     auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
77     auto restart_func = [self, _force] {
78         if (!_force && self->state_ == cei_state_e::CONNECTING) {
79             std::chrono::steady_clock::time_point its_current
80                 = std::chrono::steady_clock::now();
81             long its_connect_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
82                     its_current - self->connect_timepoint_).count();
83             if (self->aborted_restart_count_ < self->tcp_restart_aborts_max_
84                     && its_connect_duration < self->tcp_connect_time_max_) {
85                 self->aborted_restart_count_++;
86                 return;
87             } else {
88                 VSOMEIP_WARNING << "tce::restart: maximum number of aborted restarts ["
89                         << self->tcp_restart_aborts_max_ << "] reached! its_connect_duration: "
90                         << its_connect_duration;
91             }
92         }
93         self->state_ = cei_state_e::CONNECTING;
94         std::string address_port_local;
95         {
96             std::lock_guard<std::mutex> its_lock(self->socket_mutex_);
97             address_port_local = self->get_address_port_local();
98             self->shutdown_and_close_socket_unlocked(true);
99             self->recv_buffer_ = std::make_shared<message_buffer_t>(self->recv_buffer_size_initial_, 0);
100         }
101         self->was_not_connected_ = true;
102         self->reconnect_counter_ = 0;
103         {
104             std::lock_guard<std::mutex> its_lock(self->mutex_);
105             for (const auto&m : self->queue_) {
106                 const service_t its_service = VSOMEIP_BYTES_TO_WORD(
107                         (*m)[VSOMEIP_SERVICE_POS_MIN],
108                         (*m)[VSOMEIP_SERVICE_POS_MAX]);
109                 const method_t its_method = VSOMEIP_BYTES_TO_WORD(
110                         (*m)[VSOMEIP_METHOD_POS_MIN],
111                         (*m)[VSOMEIP_METHOD_POS_MAX]);
112                 const client_t its_client = VSOMEIP_BYTES_TO_WORD(
113                         (*m)[VSOMEIP_CLIENT_POS_MIN],
114                         (*m)[VSOMEIP_CLIENT_POS_MAX]);
115                 const session_t its_session = VSOMEIP_BYTES_TO_WORD(
116                         (*m)[VSOMEIP_SESSION_POS_MIN],
117                         (*m)[VSOMEIP_SESSION_POS_MAX]);
118                 VSOMEIP_WARNING << "tce::restart: dropping message: "
119                         << "remote:" << self->get_address_port_remote() << " ("
120                         << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
121                         << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
122                         << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
123                         << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"
124                         << " size: " << std::dec << m->size();
125             }
126             self->queue_.clear();
127             self->queue_size_ = 0;
128         }
129         VSOMEIP_WARNING << "tce::restart: local: " << address_port_local
130                 << " remote: " << self->get_address_port_remote();
131         self->start_connect_timer();
132     };
133     // bind to strand_ to avoid socket closure if
134     // parallel socket operation is currently active
135     strand_.dispatch(restart_func);
136 }
137 
connect()138 void tcp_client_endpoint_impl::connect() {
139     std::lock_guard<std::mutex> its_lock(socket_mutex_);
140     boost::system::error_code its_error;
141     socket_->open(remote_.protocol(), its_error);
142 
143     if (!its_error || its_error == boost::asio::error::already_open) {
144         // Nagle algorithm off
145         socket_->set_option(ip::tcp::no_delay(true), its_error);
146         if (its_error) {
147             VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't disable "
148                     << "Nagle algorithm: " << its_error.message()
149                     << " remote:" << get_address_port_remote();
150         }
151 
152         socket_->set_option(boost::asio::socket_base::keep_alive(true), its_error);
153         if (its_error) {
154             VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't enable "
155                     << "keep_alive: " << its_error.message()
156                     << " remote:" << get_address_port_remote();
157         }
158 
159         // Enable SO_REUSEADDR to avoid bind problems with services going offline
160         // and coming online again and the user has specified only a small number
161         // of ports in the clients section for one service instance
162         socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error);
163         if (its_error) {
164             VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't enable "
165                     << "SO_REUSEADDR: " << its_error.message()
166                     << " remote:" << get_address_port_remote();
167         }
168         socket_->set_option(boost::asio::socket_base::linger(true, 0), its_error);
169         if (its_error) {
170             VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't enable "
171                     << "SO_LINGER: " << its_error.message()
172                     << " remote:" << get_address_port_remote();
173         }
174 
175 #ifndef _WIN32
176         // If specified, bind to device
177         std::string its_device(configuration_->get_device());
178         if (its_device != "") {
179             if (setsockopt(socket_->native_handle(),
180                     SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) {
181                 VSOMEIP_WARNING << "TCP Client: Could not bind to device \"" << its_device << "\"";
182             }
183         }
184 #endif
185 
186         // In case a client endpoint port was configured,
187         // bind to it before connecting
188         if (local_.port() != ILLEGAL_PORT) {
189             boost::system::error_code its_bind_error;
190             socket_->bind(local_, its_bind_error);
191             if(its_bind_error) {
192                 VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
193                         "Error binding socket: " << its_bind_error.message()
194                         << " local: " << local_.address().to_string()
195                         << ":" << std::dec << local_.port()
196                         << " remote:" << get_address_port_remote();
197 
198                 std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock();
199                 if (its_host) {
200                     // set new client port depending on service / instance / remote port
201                     if (!its_host->on_bind_error(shared_from_this(), remote_port_)) {
202                         VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
203                                 "Failed to set new local port for tce: "
204                                 << " local: " << local_.address().to_string()
205                                 << ":" << std::dec << local_.port()
206                                 << " remote:" << get_address_port_remote();
207                     } else {
208                         local_.port(local_port_);
209                         VSOMEIP_INFO << "tcp_client_endpoint::connect: "
210                                 "Using new new local port for tce: "
211                                 << " local: " << local_.address().to_string()
212                                 << ":" << std::dec << local_.port()
213                                 << " remote:" << get_address_port_remote();
214                     }
215                 }
216                 try {
217                     // don't connect on bind error to avoid using a random port
218                     strand_.post(std::bind(&client_endpoint_impl::connect_cbk,
219                                     shared_from_this(), its_bind_error));
220                 } catch (const std::exception &e) {
221                     VSOMEIP_ERROR << "tcp_client_endpoint_impl::connect: "
222                             << e.what()
223                             << " local: " << local_.address().to_string()
224                             << ":" << std::dec << local_.port()
225                             << " remote:" << get_address_port_remote();
226                 }
227                 return;
228             }
229             return;
230         }
231 
232         state_ = cei_state_e::CONNECTING;
233         connect_timepoint_ = std::chrono::steady_clock::now();
234         aborted_restart_count_ = 0;
235         socket_->async_connect(
236             remote_,
237             strand_.wrap(
238                 std::bind(
239                     &tcp_client_endpoint_base_impl::connect_cbk,
240                     shared_from_this(),
241                     std::placeholders::_1
242                 )
243             )
244         );
245     } else {
246         VSOMEIP_WARNING << "tcp_client_endpoint::connect: Error opening socket: "
247                 << its_error.message() << " remote:" << get_address_port_remote();
248         strand_.post(std::bind(&tcp_client_endpoint_base_impl::connect_cbk,
249                                 shared_from_this(), its_error));
250     }
251 }
252 
receive()253 void tcp_client_endpoint_impl::receive() {
254     message_buffer_ptr_t its_recv_buffer;
255     {
256         std::lock_guard<std::mutex> its_lock(socket_mutex_);
257         its_recv_buffer = recv_buffer_;
258     }
259     auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
260     strand_.dispatch([self, &its_recv_buffer](){
261         self->receive(its_recv_buffer, 0, 0);
262     });
263 }
264 
receive(message_buffer_ptr_t _recv_buffer,std::size_t _recv_buffer_size,std::size_t _missing_capacity)265 void tcp_client_endpoint_impl::receive(message_buffer_ptr_t  _recv_buffer,
266              std::size_t _recv_buffer_size,
267              std::size_t _missing_capacity) {
268     std::lock_guard<std::mutex> its_lock(socket_mutex_);
269     if(socket_->is_open()) {
270         const std::size_t its_capacity(_recv_buffer->capacity());
271         size_t buffer_size = its_capacity - _recv_buffer_size;
272         try {
273             if (_missing_capacity) {
274                 if (_missing_capacity > MESSAGE_SIZE_UNLIMITED) {
275                     VSOMEIP_ERROR << "Missing receive buffer capacity exceeds allowed maximum!";
276                     return;
277                 }
278                 const std::size_t its_required_capacity(_recv_buffer_size + _missing_capacity);
279                 if (its_capacity < its_required_capacity) {
280                     _recv_buffer->reserve(its_required_capacity);
281                     _recv_buffer->resize(its_required_capacity, 0x0);
282                     if (_recv_buffer->size() > 1048576) {
283                         VSOMEIP_INFO << "tce: recv_buffer size is: " <<
284                                 _recv_buffer->size()
285                                 << " local: " << get_address_port_local()
286                                 << " remote: " << get_address_port_remote();
287                     }
288                 }
289                 buffer_size = _missing_capacity;
290             } else if (buffer_shrink_threshold_
291                     && shrink_count_ > buffer_shrink_threshold_
292                     && _recv_buffer_size == 0) {
293                 _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
294                 _recv_buffer->shrink_to_fit();
295                 buffer_size = recv_buffer_size_initial_;
296                 shrink_count_ = 0;
297             }
298         } catch (const std::exception &e) {
299             handle_recv_buffer_exception(e, _recv_buffer, _recv_buffer_size);
300             // don't start receiving again
301             return;
302         }
303         socket_->async_receive(
304             boost::asio::buffer(&(*_recv_buffer)[_recv_buffer_size], buffer_size),
305             strand_.wrap(
306                 std::bind(
307                     &tcp_client_endpoint_impl::receive_cbk,
308                     std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()),
309                     std::placeholders::_1,
310                     std::placeholders::_2,
311                     _recv_buffer,
312                     _recv_buffer_size
313                 )
314             )
315         );
316     }
317 }
318 
send_queued(message_buffer_ptr_t _buffer)319 void tcp_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) {
320     const service_t its_service = VSOMEIP_BYTES_TO_WORD(
321             (*_buffer)[VSOMEIP_SERVICE_POS_MIN],
322             (*_buffer)[VSOMEIP_SERVICE_POS_MAX]);
323     const method_t its_method = VSOMEIP_BYTES_TO_WORD(
324             (*_buffer)[VSOMEIP_METHOD_POS_MIN],
325             (*_buffer)[VSOMEIP_METHOD_POS_MAX]);
326     const client_t its_client = VSOMEIP_BYTES_TO_WORD(
327             (*_buffer)[VSOMEIP_CLIENT_POS_MIN],
328             (*_buffer)[VSOMEIP_CLIENT_POS_MAX]);
329     const session_t its_session = VSOMEIP_BYTES_TO_WORD(
330             (*_buffer)[VSOMEIP_SESSION_POS_MIN],
331             (*_buffer)[VSOMEIP_SESSION_POS_MAX]);
332 
333     if (has_enabled_magic_cookies_) {
334         const std::chrono::steady_clock::time_point now =
335                 std::chrono::steady_clock::now();
336         if (std::chrono::duration_cast<std::chrono::milliseconds>(
337                 now - last_cookie_sent_) > std::chrono::milliseconds(10000)) {
338             send_magic_cookie(_buffer);
339             last_cookie_sent_ = now;
340         }
341     }
342 
343 
344 #if 0
345     std::stringstream msg;
346     msg << "tcei<" << remote_.address() << ":"
347         << std::dec << remote_.port()  << ">::sq: ";
348     for (std::size_t i = 0; i < _buffer->size(); i++)
349         msg << std::hex << std::setw(2) << std::setfill('0')
350             << (int)(*_buffer)[i] << " ";
351     VSOMEIP_INFO << msg.str();
352 #endif
353     {
354         std::lock_guard<std::mutex> its_lock(socket_mutex_);
355         if (socket_->is_open()) {
356             {
357                 std::lock_guard<std::mutex> its_sent_lock(sent_mutex_);
358                 is_sending_ = true;
359             }
360             boost::asio::async_write(
361                 *socket_,
362                 boost::asio::buffer(*_buffer),
363                 std::bind(
364                     &tcp_client_endpoint_impl::write_completion_condition,
365                     std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()),
366                     std::placeholders::_1,
367                     std::placeholders::_2,
368                     _buffer->size(),
369                     its_service, its_method, its_client, its_session,
370                     std::chrono::steady_clock::now()),
371                 strand_.wrap(
372                     std::bind(
373                     &tcp_client_endpoint_base_impl::send_cbk,
374                     shared_from_this(),
375                     std::placeholders::_1,
376                     std::placeholders::_2,
377                     _buffer
378                 ))
379             );
380         }
381     }
382 }
383 
get_configured_times_from_endpoint(service_t _service,method_t _method,std::chrono::nanoseconds * _debouncing,std::chrono::nanoseconds * _maximum_retention) const384 void tcp_client_endpoint_impl::get_configured_times_from_endpoint(
385         service_t _service, method_t _method,
386         std::chrono::nanoseconds *_debouncing,
387         std::chrono::nanoseconds *_maximum_retention) const {
388     configuration_->get_configured_timing_requests(_service,
389             remote_address_.to_string(), remote_port_, _method,
390             _debouncing, _maximum_retention);
391 }
392 
get_remote_address(boost::asio::ip::address & _address) const393 bool tcp_client_endpoint_impl::get_remote_address(
394         boost::asio::ip::address &_address) const {
395     if (remote_address_.is_unspecified()) {
396         return false;
397     }
398     _address = remote_address_;
399     return true;
400 }
401 
set_local_port()402 void tcp_client_endpoint_impl::set_local_port() {
403     std::lock_guard<std::mutex> its_lock(socket_mutex_);
404     boost::system::error_code its_error;
405     if (socket_->is_open()) {
406         endpoint_type its_endpoint = socket_->local_endpoint(its_error);
407         if (!its_error) {
408             local_port_ = its_endpoint.port();
409         } else {
410             VSOMEIP_WARNING << "tcp_client_endpoint_impl::set_local_port() "
411                     << " couldn't get local_endpoint: " << its_error.message();
412         }
413     }
414 }
415 
write_completion_condition(const boost::system::error_code & _error,std::size_t _bytes_transferred,std::size_t _bytes_to_send,service_t _service,method_t _method,client_t _client,session_t _session,const std::chrono::steady_clock::time_point _start)416 std::size_t tcp_client_endpoint_impl::write_completion_condition(
417         const boost::system::error_code& _error, std::size_t _bytes_transferred,
418         std::size_t _bytes_to_send, service_t _service, method_t _method,
419         client_t _client, session_t _session,
420         const std::chrono::steady_clock::time_point _start) {
421 
422     if (_error) {
423         VSOMEIP_ERROR << "tce::write_completion_condition: "
424                 << _error.message() << "(" << std::dec << _error.value()
425                 << ") bytes transferred: " << std::dec << _bytes_transferred
426                 << " bytes to sent: " << std::dec << _bytes_to_send << " "
427                 << "remote:" << get_address_port_remote() << " ("
428                 << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
429                 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
430                 << std::hex << std::setw(4) << std::setfill('0') << _method << "."
431                 << std::hex << std::setw(4) << std::setfill('0') << _session << "]";
432         return 0;
433     }
434 
435     const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
436     const std::chrono::milliseconds passed = std::chrono::duration_cast<std::chrono::milliseconds>(now - _start);
437     if (passed > send_timeout_warning_) {
438         if (passed > send_timeout_) {
439             VSOMEIP_ERROR << "tce::write_completion_condition: "
440                     << _error.message() << "(" << std::dec << _error.value()
441                     << ") took longer than " << std::dec << send_timeout_.count()
442                     << "ms bytes transferred: " << std::dec << _bytes_transferred
443                     << " bytes to sent: " << std::dec << _bytes_to_send << " "
444                     << "remote:" << get_address_port_remote() << " ("
445                     << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
446                     << std::hex << std::setw(4) << std::setfill('0') << _service << "."
447                     << std::hex << std::setw(4) << std::setfill('0') << _method << "."
448                     << std::hex << std::setw(4) << std::setfill('0') << _session << "]";
449         } else {
450             VSOMEIP_WARNING << "tce::write_completion_condition: "
451                     << _error.message() << "(" << std::dec << _error.value()
452                     << ") took longer than " << std::dec << send_timeout_warning_.count()
453                     << "ms bytes transferred: " << std::dec << _bytes_transferred
454                     << " bytes to sent: " << std::dec << _bytes_to_send << " "
455                     << "remote:" << get_address_port_remote() << " ("
456                     << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
457                     << std::hex << std::setw(4) << std::setfill('0') << _service << "."
458                     << std::hex << std::setw(4) << std::setfill('0') << _method << "."
459                     << std::hex << std::setw(4) << std::setfill('0') << _session << "]";
460         }
461     }
462     return _bytes_to_send - _bytes_transferred;
463 }
464 
get_remote_port() const465 std::uint16_t tcp_client_endpoint_impl::get_remote_port() const {
466     return remote_port_;
467 }
468 
is_reliable() const469 bool tcp_client_endpoint_impl::is_reliable() const {
470   return true;
471 }
472 
is_magic_cookie(const message_buffer_ptr_t & _recv_buffer,size_t _offset) const473 bool tcp_client_endpoint_impl::is_magic_cookie(const message_buffer_ptr_t& _recv_buffer,
474                                                size_t _offset) const {
475     return (0 == std::memcmp(SERVICE_COOKIE, &(*_recv_buffer)[_offset], sizeof(SERVICE_COOKIE)));
476 }
477 
send_magic_cookie(message_buffer_ptr_t & _buffer)478 void tcp_client_endpoint_impl::send_magic_cookie(message_buffer_ptr_t &_buffer) {
479     if (max_message_size_ == MESSAGE_SIZE_UNLIMITED
480             || max_message_size_ - _buffer->size() >=
481         VSOMEIP_SOMEIP_HEADER_SIZE + VSOMEIP_SOMEIP_MAGIC_COOKIE_SIZE) {
482         _buffer->insert(
483             _buffer->begin(),
484             CLIENT_COOKIE,
485             CLIENT_COOKIE + sizeof(CLIENT_COOKIE)
486         );
487         queue_size_ += sizeof(CLIENT_COOKIE);
488     } else {
489         VSOMEIP_WARNING << "Packet full. Cannot insert magic cookie!";
490     }
491 }
492 
receive_cbk(boost::system::error_code const & _error,std::size_t _bytes,const message_buffer_ptr_t & _recv_buffer,std::size_t _recv_buffer_size)493 void tcp_client_endpoint_impl::receive_cbk(
494         boost::system::error_code const &_error, std::size_t _bytes,
495         const message_buffer_ptr_t& _recv_buffer, std::size_t _recv_buffer_size) {
496     if (_error == boost::asio::error::operation_aborted) {
497         // endpoint was stopped
498         return;
499     }
500 #if 0
501     std::stringstream msg;
502     msg << "cei::rcb (" << _error.message() << "): ";
503     for (std::size_t i = 0; i < _bytes + _recv_buffer_size; ++i)
504         msg << std::hex << std::setw(2) << std::setfill('0')
505             << (int) (_recv_buffer)[i] << " ";
506     VSOMEIP_INFO << msg.str();
507 #endif
508     std::unique_lock<std::mutex> its_lock(socket_mutex_);
509     std::shared_ptr<routing_host> its_host = routing_host_.lock();
510     if (its_host) {
511         std::uint32_t its_missing_capacity(0);
512         if (!_error && 0 < _bytes) {
513             if (_recv_buffer_size + _bytes < _recv_buffer_size) {
514                 VSOMEIP_ERROR << "receive buffer overflow in tcp client endpoint ~> abort!";
515                 return;
516             }
517             _recv_buffer_size += _bytes;
518 
519             size_t its_iteration_gap = 0;
520             bool has_full_message(false);
521             do {
522                 uint64_t read_message_size
523                     = utility::get_message_size(&(*_recv_buffer)[its_iteration_gap],
524                             _recv_buffer_size);
525                 if (read_message_size > MESSAGE_SIZE_UNLIMITED) {
526                     VSOMEIP_ERROR << "Message size exceeds allowed maximum!";
527                     return;
528                 }
529                 uint32_t current_message_size = static_cast<uint32_t>(read_message_size);
530                 has_full_message = (current_message_size > VSOMEIP_RETURN_CODE_POS
531                                  && current_message_size <= _recv_buffer_size);
532                 if (has_full_message) {
533                     bool needs_forwarding(true);
534                     if (is_magic_cookie(_recv_buffer, its_iteration_gap)) {
535                         has_enabled_magic_cookies_ = true;
536                     } else {
537                         if (has_enabled_magic_cookies_) {
538                             uint32_t its_offset = find_magic_cookie(&(*_recv_buffer)[its_iteration_gap],
539                                     (uint32_t) _recv_buffer_size);
540                             if (its_offset < current_message_size) {
541                                 VSOMEIP_ERROR << "Message includes Magic Cookie. Ignoring it.";
542                                 current_message_size = its_offset;
543                                 needs_forwarding = false;
544                             }
545                         }
546                     }
547                     if (needs_forwarding) {
548                         if (!has_enabled_magic_cookies_) {
549                             its_host->on_message(&(*_recv_buffer)[its_iteration_gap],
550                                                  current_message_size, this,
551                                                  boost::asio::ip::address(),
552                                                  VSOMEIP_ROUTING_CLIENT,
553                                                  std::make_pair(ANY_UID, ANY_GID),
554                                                  remote_address_,
555                                                  remote_port_);
556                         } else {
557                             // Only call on_message without a magic cookie in front of the buffer!
558                             if (!is_magic_cookie(_recv_buffer, its_iteration_gap)) {
559                                 its_host->on_message(&(*_recv_buffer)[its_iteration_gap],
560                                                      current_message_size, this,
561                                                      boost::asio::ip::address(),
562                                                      VSOMEIP_ROUTING_CLIENT,
563                                                      std::make_pair(ANY_UID, ANY_GID),
564                                                      remote_address_,
565                                                      remote_port_);
566                             }
567                         }
568                     }
569                     calculate_shrink_count(_recv_buffer, _recv_buffer_size);
570                     _recv_buffer_size -= current_message_size;
571                     its_iteration_gap += current_message_size;
572                     its_missing_capacity = 0;
573                 } else if (has_enabled_magic_cookies_ && _recv_buffer_size > 0) {
574                     const uint32_t its_offset = find_magic_cookie(
575                             &(*_recv_buffer)[its_iteration_gap], _recv_buffer_size);
576                     if (its_offset < _recv_buffer_size) {
577                         _recv_buffer_size -= its_offset;
578                         its_iteration_gap += its_offset;
579                         has_full_message = true; // trigger next loop
580                         VSOMEIP_ERROR << "Detected Magic Cookie within message data."
581                             << " Resyncing. local: " << get_address_port_local()
582                             << " remote: " << get_address_port_remote();
583                     }
584                 }
585 
586                 if (!has_full_message) {
587                     if (_recv_buffer_size > VSOMEIP_RETURN_CODE_POS &&
588                         ((*recv_buffer_)[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION ||
589                          !utility::is_valid_message_type(static_cast<message_type_e>((*recv_buffer_)[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS])) ||
590                          !utility::is_valid_return_code(static_cast<return_code_e>((*recv_buffer_)[its_iteration_gap + VSOMEIP_RETURN_CODE_POS]))
591                         )) {
592                         if ((*recv_buffer_)[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) {
593                             VSOMEIP_ERROR << "tce: Wrong protocol version: 0x"
594                                     << std::hex << std::setw(2) << std::setfill('0')
595                                     << std::uint32_t((*recv_buffer_)[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS])
596                                     << " local: " << get_address_port_local()
597                                     << " remote: " << get_address_port_remote();
598                             // ensure to send back a message w/ wrong protocol version
599                             its_lock.unlock();
600                             its_host->on_message(&(*_recv_buffer)[its_iteration_gap],
601                                                  VSOMEIP_SOMEIP_HEADER_SIZE + 8, this,
602                                                  boost::asio::ip::address(),
603                                                  VSOMEIP_ROUTING_CLIENT,
604                                                  std::make_pair(ANY_UID, ANY_GID),
605                                                  remote_address_,
606                                                  remote_port_);
607                             its_lock.lock();
608                         } else if (!utility::is_valid_message_type(static_cast<message_type_e>(
609                                 (*recv_buffer_)[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS]))) {
610                             VSOMEIP_ERROR << "tce: Invalid message type: 0x"
611                                     << std::hex << std::setw(2) << std::setfill('0')
612                                     << std::uint32_t((*recv_buffer_)[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS])
613                                     << " local: " << get_address_port_local()
614                                     << " remote: " << get_address_port_remote();
615                         } else if (!utility::is_valid_return_code(static_cast<return_code_e>(
616                                 (*recv_buffer_)[its_iteration_gap + VSOMEIP_RETURN_CODE_POS]))) {
617                             VSOMEIP_ERROR << "tce: Invalid return code: 0x"
618                                     << std::hex << std::setw(2) << std::setfill('0')
619                                     << std::uint32_t((*recv_buffer_)[its_iteration_gap + VSOMEIP_RETURN_CODE_POS])
620                                     << " local: " << get_address_port_local()
621                                     << " remote: " << get_address_port_remote();
622                         }
623                         state_ = cei_state_e::CONNECTING;
624                         shutdown_and_close_socket_unlocked(false);
625                         its_lock.unlock();
626 
627                         // wait_until_sent interprets "no error" as timeout.
628                         // Therefore call it with an error.
629                         wait_until_sent(boost::asio::error::operation_aborted);
630                         return;
631                     } else if (max_message_size_ != MESSAGE_SIZE_UNLIMITED &&
632                             current_message_size > max_message_size_) {
633                         _recv_buffer_size = 0;
634                         _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
635                         _recv_buffer->shrink_to_fit();
636                         if (has_enabled_magic_cookies_) {
637                             VSOMEIP_ERROR << "Received a TCP message which exceeds "
638                                           << "maximum message size ("
639                                           << std::dec << current_message_size
640                                           << "). Magic Cookies are enabled: "
641                                           << "Resetting receiver. local: "
642                                           << get_address_port_local() << " remote: "
643                                           << get_address_port_remote();
644                         } else {
645                             VSOMEIP_ERROR << "Received a TCP message which exceeds "
646                                           << "maximum message size ("
647                                           << std::dec << current_message_size
648                                           << ") Magic cookies are disabled, "
649                                           << "Restarting connection. "
650                                           << "local: " << get_address_port_local()
651                                           << " remote: " << get_address_port_remote();
652                             state_ = cei_state_e::CONNECTING;
653                             shutdown_and_close_socket_unlocked(false);
654                             its_lock.unlock();
655 
656                             // wait_until_sent interprets "no error" as timeout.
657                             // Therefore call it with an error.
658                             wait_until_sent(boost::asio::error::operation_aborted);
659                             return;
660                         }
661                     } else if (current_message_size > _recv_buffer_size) {
662                             its_missing_capacity = current_message_size
663                                     - static_cast<std::uint32_t>(_recv_buffer_size);
664                     } else if (VSOMEIP_SOMEIP_HEADER_SIZE > _recv_buffer_size) {
665                             its_missing_capacity = VSOMEIP_SOMEIP_HEADER_SIZE
666                                     - static_cast<std::uint32_t>(_recv_buffer_size);
667                     } else if (has_enabled_magic_cookies_ && _recv_buffer_size > 0) {
668                         // no need to check for magic cookie here again: has_full_message
669                         // would have been set to true if there was one present in the data
670                         _recv_buffer_size = 0;
671                         _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
672                         _recv_buffer->shrink_to_fit();
673                         its_missing_capacity = 0;
674                         VSOMEIP_ERROR << "tce::c<" << this
675                                 << ">rcb: recv_buffer_capacity: "
676                                 << _recv_buffer->capacity()
677                                 << " local: " << get_address_port_local()
678                                 << " remote: " << get_address_port_remote()
679                                 << ". Didn't find magic cookie in broken data, trying to resync.";
680                     } else {
681                         VSOMEIP_ERROR << "tce::c<" << this
682                                 << ">rcb: recv_buffer_size is: " << std::dec
683                                 << _recv_buffer_size << " but couldn't read "
684                                 "out message_size. recv_buffer_capacity: "
685                                 << _recv_buffer->capacity()
686                                 << " its_iteration_gap: " << its_iteration_gap
687                                 << " local: " << get_address_port_local()
688                                 << " remote: " << get_address_port_remote()
689                                 << ". Restarting connection due to missing/broken data TCP stream.";
690                         state_ = cei_state_e::CONNECTING;
691                         shutdown_and_close_socket_unlocked(false);
692                         its_lock.unlock();
693 
694                         // wait_until_sent interprets "no error" as timeout.
695                         // Therefore call it with an error.
696                         wait_until_sent(boost::asio::error::operation_aborted);
697                         return;
698                     }
699                 }
700             } while (has_full_message && _recv_buffer_size);
701             if (its_iteration_gap) {
702                 // Copy incomplete message to front for next receive_cbk iteration
703                 for (size_t i = 0; i < _recv_buffer_size; ++i) {
704                     (*_recv_buffer)[i] = (*_recv_buffer)[i + its_iteration_gap];
705                 }
706                 // Still more capacity needed after shifting everything to front?
707                 if (its_missing_capacity &&
708                         its_missing_capacity <= _recv_buffer->capacity() - _recv_buffer_size) {
709                     its_missing_capacity = 0;
710                 }
711             }
712             its_lock.unlock();
713             auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
714             strand_.dispatch([self, &_recv_buffer, _recv_buffer_size, its_missing_capacity](){
715                 self->receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
716             });
717         } else {
718             VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
719                     << _error.message() << "(" << std::dec << _error.value()
720                     << ") local: " << get_address_port_local()
721                     << " remote: " << get_address_port_remote();
722             if (_error ==  boost::asio::error::eof ||
723                     _error == boost::asio::error::timed_out ||
724                     _error == boost::asio::error::bad_descriptor ||
725                     _error == boost::asio::error::connection_reset) {
726                 if (state_ == cei_state_e::CONNECTING) {
727                     VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk already"
728                             " restarting" << get_remote_information();
729                 } else {
730                     VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk restarting.";
731                     state_ = cei_state_e::CONNECTING;
732                     shutdown_and_close_socket_unlocked(false);
733                     its_lock.unlock();
734 
735                     // wait_until_sent interprets "no error" as timeout.
736                     // Therefore call it with an error.
737                     wait_until_sent(boost::asio::error::operation_aborted);
738                 }
739             } else {
740                 its_lock.unlock();
741                 auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
742                 strand_.dispatch([self, &_recv_buffer, _recv_buffer_size, its_missing_capacity](){
743                     self->receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
744                 });
745             }
746         }
747     }
748 }
749 
calculate_shrink_count(const message_buffer_ptr_t & _recv_buffer,std::size_t _recv_buffer_size)750 void tcp_client_endpoint_impl::calculate_shrink_count(const message_buffer_ptr_t& _recv_buffer,
751                                                       std::size_t _recv_buffer_size) {
752     if (buffer_shrink_threshold_) {
753         if (_recv_buffer->capacity() != recv_buffer_size_initial_) {
754             if (_recv_buffer_size < (_recv_buffer->capacity() >> 1)) {
755                 shrink_count_++;
756             } else {
757                 shrink_count_ = 0;
758             }
759         }
760     }
761 }
762 
763 
get_address_port_remote() const764 const std::string tcp_client_endpoint_impl::get_address_port_remote() const {
765     boost::system::error_code ec;
766     std::string its_address_port;
767     its_address_port.reserve(21);
768     boost::asio::ip::address its_address;
769     if (get_remote_address(its_address)) {
770         its_address_port += its_address.to_string();
771     }
772     its_address_port += ":";
773     its_address_port += std::to_string(remote_port_);
774     return its_address_port;
775 }
776 
get_address_port_local() const777 const std::string tcp_client_endpoint_impl::get_address_port_local() const {
778     std::string its_address_port;
779     its_address_port.reserve(21);
780     boost::system::error_code ec;
781     if (socket_->is_open()) {
782         endpoint_type its_local_endpoint = socket_->local_endpoint(ec);
783         if (!ec) {
784             its_address_port += its_local_endpoint.address().to_string(ec);
785             its_address_port += ":";
786             its_address_port.append(std::to_string(its_local_endpoint.port()));
787         }
788     }
789     return its_address_port;
790 }
791 
handle_recv_buffer_exception(const std::exception & _e,const message_buffer_ptr_t & _recv_buffer,std::size_t _recv_buffer_size)792 void tcp_client_endpoint_impl::handle_recv_buffer_exception(
793         const std::exception &_e,
794         const message_buffer_ptr_t& _recv_buffer,
795         std::size_t _recv_buffer_size) {
796     boost::system::error_code ec;
797 
798     std::stringstream its_message;
799     its_message <<"tcp_client_endpoint_impl::connection catched exception"
800             << _e.what() << " local: " << get_address_port_local()
801             << " remote: " << get_address_port_remote()
802             << " shutting down connection. Start of buffer: ";
803 
804     for (std::size_t i = 0; i < _recv_buffer_size && i < 16; i++) {
805         its_message << std::setw(2) << std::setfill('0') << std::hex
806             << (int) ((*_recv_buffer)[i]) << " ";
807     }
808 
809     its_message << " Last 16 Bytes captured: ";
810     for (int i = 15; _recv_buffer_size > 15 && i >= 0; i--) {
811         its_message << std::setw(2) << std::setfill('0') << std::hex
812             << (int) ((*_recv_buffer)[static_cast<size_t>(i)]) << " ";
813     }
814     VSOMEIP_ERROR << its_message.str();
815     _recv_buffer->clear();
816     {
817         std::lock_guard<std::mutex> its_lock(mutex_);
818         sending_blocked_ = true;
819     }
820     {
821         std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
822         boost::system::error_code ec;
823         connect_timer_.cancel(ec);
824     }
825     if (socket_->is_open()) {
826         boost::system::error_code its_error;
827         socket_->shutdown(socket_type::shutdown_both, its_error);
828         socket_->close(its_error);
829     }
830 }
831 
print_status()832 void tcp_client_endpoint_impl::print_status() {
833     std::size_t its_data_size(0);
834     std::size_t its_queue_size(0);
835     std::size_t its_receive_buffer_capacity(0);
836     {
837         std::lock_guard<std::mutex> its_lock(mutex_);
838         its_queue_size = queue_.size();
839         its_data_size = queue_size_;
840     }
841     std::string local;
842     {
843         std::lock_guard<std::mutex> its_lock(socket_mutex_);
844         local = get_address_port_local();
845         its_receive_buffer_capacity = recv_buffer_->capacity();
846     }
847 
848     VSOMEIP_INFO << "status tce: " << local << " -> "
849             << get_address_port_remote()
850             << " queue: " << std::dec << its_queue_size
851             << " data: " << std::dec << its_data_size
852             << " recv_buffer: " << std::dec << its_receive_buffer_capacity;
853 }
854 
get_remote_information() const855 std::string tcp_client_endpoint_impl::get_remote_information() const {
856     boost::system::error_code ec;
857     return remote_.address().to_string(ec) + ":"
858             + std::to_string(remote_.port());
859 }
860 
send_cbk(boost::system::error_code const & _error,std::size_t _bytes,const message_buffer_ptr_t & _sent_msg)861 void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error,
862                                         std::size_t _bytes,
863                                         const message_buffer_ptr_t& _sent_msg) {
864     (void)_bytes;
865 
866     {
867         // Signal that the current send operation has finished.
868         // Note: Waiting is always done after having closed the socket.
869         //       Therefore, no new send operation will be scheduled.
870         std::lock_guard<std::mutex> its_sent_lock(sent_mutex_);
871         is_sending_ = false;
872 
873         boost::system::error_code ec;
874         sent_timer_.cancel(ec);
875     }
876 
877     if (!_error) {
878         std::lock_guard<std::mutex> its_lock(mutex_);
879         if (queue_.size() > 0) {
880             queue_size_ -= queue_.front()->size();
881             queue_.pop_front();
882             auto its_buffer = get_front();
883             if (its_buffer) {
884                 auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
885                 strand_.dispatch(
886                     [self, its_buffer]() { self->send_queued(its_buffer);}
887                 );
888             }
889         }
890     } else if (_error == boost::system::errc::destination_address_required) {
891         VSOMEIP_WARNING << "tce::send_cbk received error: " << _error.message()
892                 << " (" << std::dec << _error.value() << ") "
893                 << get_remote_information();
894         was_not_connected_ = true;
895     } else if (_error == boost::asio::error::operation_aborted) {
896         // endpoint was stopped
897         shutdown_and_close_socket(false);
898     } else {
899         if (state_ == cei_state_e::CONNECTING) {
900             VSOMEIP_WARNING << "tce::send_cbk endpoint is already restarting:"
901                     << get_remote_information();
902         } else {
903             state_ = cei_state_e::CONNECTING;
904             shutdown_and_close_socket(false);
905             std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock();
906             if (its_host) {
907                 its_host->on_disconnect(shared_from_this());
908             }
909             restart(true);
910         }
911         service_t its_service(0);
912         method_t its_method(0);
913         client_t its_client(0);
914         session_t its_session(0);
915         if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
916             its_service = VSOMEIP_BYTES_TO_WORD(
917                     (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
918                     (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
919             its_method = VSOMEIP_BYTES_TO_WORD(
920                     (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
921                     (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
922             its_client = VSOMEIP_BYTES_TO_WORD(
923                     (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
924                     (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
925             its_session = VSOMEIP_BYTES_TO_WORD(
926                     (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
927                     (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
928         }
929         VSOMEIP_WARNING << "tce::send_cbk received error: "
930                 << _error.message() << " (" << std::dec
931                 << _error.value() << ") " << get_remote_information()
932                 << " " << std::dec << queue_.size()
933                 << " " << std::dec << queue_size_ << " ("
934                 << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
935                 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
936                 << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
937                 << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
938     }
939 }
940 
tp_segmentation_enabled(service_t _service,method_t _method) const941 bool tcp_client_endpoint_impl::tp_segmentation_enabled(service_t _service,
942                                                        method_t _method) const {
943     (void)_service;
944     (void)_method;
945     return false;
946 }
947 
get_max_allowed_reconnects() const948 std::uint32_t tcp_client_endpoint_impl::get_max_allowed_reconnects() const {
949     return MAX_RECONNECTS_UNLIMITED;
950 }
951 
max_allowed_reconnects_reached()952 void tcp_client_endpoint_impl::max_allowed_reconnects_reached() {
953     return;
954 }
955 
wait_until_sent(const boost::system::error_code & _error)956 void tcp_client_endpoint_impl::wait_until_sent(const boost::system::error_code &_error) {
957 
958     std::unique_lock<std::mutex> its_sent_lock(sent_mutex_);
959     if (!is_sending_ || !_error) {
960         its_sent_lock.unlock();
961         if (!_error)
962             VSOMEIP_WARNING << __func__
963                 << ": Maximum wait time for send operation exceeded for tce.";
964 
965         std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock();
966         its_ep_host->on_disconnect(shared_from_this());
967         restart(true);
968     } else {
969         std::chrono::milliseconds its_timeout(VSOMEIP_MAX_TCP_SENT_WAIT_TIME);
970         boost::system::error_code ec;
971         sent_timer_.expires_from_now(its_timeout, ec);
972         sent_timer_.async_wait(std::bind(&tcp_client_endpoint_impl::wait_until_sent,
973                 std::dynamic_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()),
974                 std::placeholders::_1));
975     }
976 }
977 
978 } // namespace vsomeip_v3
979