1 // Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 
6 #include <iomanip>
7 #include <sstream>
8 #include <limits>
9 #include <thread>
10 #include <algorithm>
11 
12 #include <boost/asio/buffer.hpp>
13 #include <boost/asio/ip/tcp.hpp>
14 #include <boost/asio/ip/udp_ext.hpp>
15 #include <boost/asio/local/stream_protocol_ext.hpp>
16 
17 #include <vsomeip/defines.hpp>
18 #include <vsomeip/internal/logger.hpp>
19 
20 #include "../include/server_endpoint_impl.hpp"
21 #include "../include/endpoint_definition.hpp"
22 
23 #include "../../utility/include/byteorder.hpp"
24 #include "../../utility/include/utility.hpp"
25 #include "../../service_discovery/include/defines.hpp"
26 
27 namespace vsomeip_v3 {
28 
29 template<typename Protocol>
server_endpoint_impl(const std::shared_ptr<endpoint_host> & _endpoint_host,const std::shared_ptr<routing_host> & _routing_host,endpoint_type _local,boost::asio::io_service & _io,std::uint32_t _max_message_size,configuration::endpoint_queue_limit_t _queue_limit,const std::shared_ptr<configuration> & _configuration)30 server_endpoint_impl<Protocol>::server_endpoint_impl(
31         const std::shared_ptr<endpoint_host>& _endpoint_host,
32         const std::shared_ptr<routing_host>& _routing_host, endpoint_type _local,
33         boost::asio::io_service &_io, std::uint32_t _max_message_size,
34         configuration::endpoint_queue_limit_t _queue_limit,
35         const std::shared_ptr<configuration>& _configuration)
36     : endpoint_impl<Protocol>(_endpoint_host, _routing_host, _local, _io, _max_message_size,
37                               _queue_limit, _configuration),
38                               sent_timer_(_io) {
39     is_sending_ = false;
40 }
41 
42 template<typename Protocol>
~server_endpoint_impl()43 server_endpoint_impl<Protocol>::~server_endpoint_impl() {
44 }
45 
46 template<typename Protocol>
prepare_stop(endpoint::prepare_stop_handler_t _handler,service_t _service)47 void server_endpoint_impl<Protocol>::prepare_stop(
48         endpoint::prepare_stop_handler_t _handler, service_t _service) {
49     std::lock_guard<std::mutex> its_lock(mutex_);
50     bool queued_train(false);
51     if (_service == ANY_SERVICE) { // endpoint is shutting down completely
52         endpoint_impl<Protocol>::sending_blocked_ = true;
53         boost::system::error_code ec;
54         for (auto const& train_iter : trains_) {
55             train_iter.second->departure_timer_->cancel(ec);
56             if (train_iter.second->buffer_->size() > 0) {
57                 auto target_queue_iter = queues_.find(train_iter.first);
58                 if (target_queue_iter != queues_.end()) {
59                     auto& its_qpair = target_queue_iter->second;
60                     const bool queue_size_zero_on_entry(its_qpair.second.empty());
61                     queue_train(target_queue_iter, train_iter.second,
62                             queue_size_zero_on_entry);
63                     queued_train = true;
64                 }
65             }
66         }
67     } else {
68         for (auto const& train_iter : trains_) {
69             for (auto const& passenger_iter : train_iter.second->passengers_) {
70                 if (passenger_iter.first == _service) {
71                     // cancel departure timer
72                     boost::system::error_code ec;
73                     train_iter.second->departure_timer_->cancel(ec);
74                     // queue train
75                     auto target_queue_iter = queues_.find(train_iter.first);
76                     if (target_queue_iter != queues_.end()) {
77                         const auto& its_qpair = target_queue_iter->second;
78                         const bool queue_size_zero_on_entry(its_qpair.second.empty());
79                         queue_train(target_queue_iter, train_iter.second,
80                                 queue_size_zero_on_entry);
81                         queued_train = true;
82                     }
83                     break;
84                 }
85             }
86         }
87     }
88     if (!queued_train) {
89         if (_service == ANY_SERVICE) {
90             if (std::all_of(queues_.begin(), queues_.end(),
91                             [&](const typename queue_type::value_type& q)
92                                 { return q.second.second.empty(); })) {
93                 // nothing was queued and all queues are empty -> ensure cbk is called
94                 auto ptr = this->shared_from_this();
95                 endpoint_impl<Protocol>::service_.post([ptr, _handler, _service](){
96                                                             _handler(ptr, _service);
97                                                         });
98             } else {
99                 prepare_stop_handlers_[_service] = _handler;
100             }
101         } else {
102             // check if any of the queues contains a message of to be stopped service
103             bool found_service_msg(false);
104             for (const auto& q : queues_) {
105                 for (const auto& msg : q.second.second ) {
106                     const service_t its_service = VSOMEIP_BYTES_TO_WORD(
107                                             (*msg)[VSOMEIP_SERVICE_POS_MIN],
108                                             (*msg)[VSOMEIP_SERVICE_POS_MAX]);
109                     if (its_service == _service) {
110                         found_service_msg = true;
111                         break;
112                     }
113                 }
114                 if (found_service_msg) {
115                     break;
116                 }
117             }
118             if (found_service_msg) {
119                 prepare_stop_handlers_[_service] = _handler;
120             } else { // no messages of the to be stopped service are or have been queued
121                 auto ptr = this->shared_from_this();
122                 endpoint_impl<Protocol>::service_.post([ptr, _handler, _service](){
123                                                             _handler(ptr, _service);
124                                                         });
125             }
126         }
127     } else {
128         prepare_stop_handlers_[_service] = _handler;
129     }
130 }
131 
132 template<typename Protocol>
stop()133 void server_endpoint_impl<Protocol>::stop() {
134 }
135 
136 template<typename Protocol>
is_client() const137 bool server_endpoint_impl<Protocol>::is_client() const {
138     return false;
139 }
140 
141 template<typename Protocol>
restart(bool _force)142 void server_endpoint_impl<Protocol>::restart(bool _force) {
143     (void)_force;
144     // intentionally left blank
145 }
146 
147 template<typename Protocol>
is_established() const148 bool server_endpoint_impl<Protocol>::is_established() const {
149     return true;
150 }
151 
152 template<typename Protocol>
is_established_or_connected() const153 bool server_endpoint_impl<Protocol>::is_established_or_connected() const {
154     return true;
155 }
156 
157 template<typename Protocol>
set_established(bool _established)158 void server_endpoint_impl<Protocol>::set_established(bool _established) {
159     (void) _established;
160 }
161 
162 template<typename Protocol>
set_connected(bool _connected)163 void server_endpoint_impl<Protocol>::set_connected(bool _connected) {
164     (void) _connected;
165 }
166 
send(const uint8_t * _data,uint32_t _size)167 template<typename Protocol>bool server_endpoint_impl<Protocol>::send(const uint8_t *_data,
168         uint32_t _size) {
169 #if 0
170     std::stringstream msg;
171     msg << "sei::send ";
172     for (uint32_t i = 0; i < _size; i++)
173         msg << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
174     VSOMEIP_INFO << msg.str();
175 #endif
176     endpoint_type its_target;
177     bool is_valid_target(false);
178 
179     if (VSOMEIP_SESSION_POS_MAX < _size) {
180         std::lock_guard<std::mutex> its_lock(mutex_);
181 
182         if(endpoint_impl<Protocol>::sending_blocked_) {
183             return false;
184         }
185 
186         const service_t its_service = VSOMEIP_BYTES_TO_WORD(
187                 _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
188         const method_t its_method = VSOMEIP_BYTES_TO_WORD(
189                 _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
190         const client_t its_client = VSOMEIP_BYTES_TO_WORD(
191                 _data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]);
192         const session_t its_session = VSOMEIP_BYTES_TO_WORD(
193                 _data[VSOMEIP_SESSION_POS_MIN], _data[VSOMEIP_SESSION_POS_MAX]);
194 
195         requests_mutex_.lock();
196         auto found_client = requests_.find(its_client);
197         if (found_client != requests_.end()) {
198             auto its_request = std::make_tuple(its_service, its_method, its_session);
199             auto found_request = found_client->second.find(its_request);
200             if (found_request != found_client->second.end()) {
201                 its_target = found_request->second;
202                 is_valid_target = true;
203                 found_client->second.erase(found_request);
204             } else {
205                 VSOMEIP_WARNING << "server_endpoint::send: request ["
206                         << std::hex << std::setw(4) << std::setfill('0')
207                         << its_service << "."
208                         << std::hex << std::setw(4) << std::setfill('0')
209                         << its_method << "/"
210                         << std::hex << std::setw(4) << std::setfill('0')
211                         << its_client << "."
212                         << std::hex << std::setw(4) << std::setfill('0')
213                         << its_session
214                         << "] could not be found.";
215                 if (its_service == VSOMEIP_SD_SERVICE
216                         && its_method == VSOMEIP_SD_METHOD) {
217                     VSOMEIP_ERROR << "Clearing clients map as a request was "
218                             "received on SD port";
219                     requests_.clear();
220                     is_valid_target = get_default_target(its_service, its_target);
221                 }
222             }
223         } else {
224             is_valid_target = get_default_target(its_service, its_target);
225         }
226         requests_mutex_.unlock();
227 
228         if (is_valid_target) {
229             is_valid_target = send_intern(its_target, _data, _size);
230         }
231     }
232     return is_valid_target;
233 }
234 
235 template<typename Protocol>
send(const std::vector<byte_t> & _cmd_header,const byte_t * _data,uint32_t _size)236 bool server_endpoint_impl<Protocol>::send(
237         const std::vector<byte_t>& _cmd_header, const byte_t *_data,
238         uint32_t _size) {
239     (void) _cmd_header;
240     (void) _data;
241     (void) _size;
242     return false;
243 }
244 
245 template<typename Protocol>
send_intern(endpoint_type _target,const byte_t * _data,uint32_t _size)246 bool server_endpoint_impl<Protocol>::send_intern(
247         endpoint_type _target, const byte_t *_data, uint32_t _size) {
248 
249     switch (check_message_size(_data, _size, _target)) {
250         case endpoint_impl<Protocol>::cms_ret_e::MSG_WAS_SPLIT:
251             return true;
252             break;
253         case endpoint_impl<Protocol>::cms_ret_e::MSG_TOO_BIG:
254             return false;
255             break;
256         case endpoint_impl<Protocol>::cms_ret_e::MSG_OK:
257         default:
258             break;
259     }
260     if (!prepare_stop_handlers_.empty()) {
261         const service_t its_service = VSOMEIP_BYTES_TO_WORD(
262                 _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
263         if (prepare_stop_handlers_.find(its_service) != prepare_stop_handlers_.end()) {
264             const method_t its_method = VSOMEIP_BYTES_TO_WORD(
265                     _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
266             const client_t its_client = VSOMEIP_BYTES_TO_WORD(
267                     _data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]);
268             const session_t its_session = VSOMEIP_BYTES_TO_WORD(
269                     _data[VSOMEIP_SESSION_POS_MIN], _data[VSOMEIP_SESSION_POS_MAX]);
270             VSOMEIP_WARNING << "server_endpoint::send: Service is stopping, ignoring message: ["
271                     << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
272                     << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
273                     << std::hex << std::setw(4) << std::setfill('0') << its_client << "."
274                     << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
275             return false;
276         }
277     }
278 
279     const queue_iterator_type target_queue_iterator = find_or_create_queue_unlocked(_target);
280 
281     bool must_depart(false);
282 
283 #if 0
284     std::stringstream msg;
285     msg << "sei::send_intern: ";
286     for (uint32_t i = 0; i < _size; i++)
287     msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
288     VSOMEIP_DEBUG << msg.str();
289 #endif
290     // STEP 1: determine the correct train
291     std::shared_ptr<train> target_train = find_or_create_train_unlocked(_target);
292 
293     const bool queue_size_zero_on_entry(target_queue_iterator->second.second.empty());
294     if (!check_queue_limit(_data, _size, target_queue_iterator->second.first)) {
295         return false;
296     }
297     // STEP 2: Determine elapsed time and update the departure time and cancel the timer
298     target_train->update_departure_time_and_stop_departure();
299 
300     // STEP 3: Get configured timings
301     const service_t its_service = VSOMEIP_BYTES_TO_WORD(
302             _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
303     const method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
304             _data[VSOMEIP_METHOD_POS_MAX]);
305 
306     std::chrono::nanoseconds its_debouncing(0), its_retention(0);
307     if (its_service != VSOMEIP_SD_SERVICE && its_method != VSOMEIP_SD_METHOD) {
308         get_configured_times_from_endpoint(its_service, its_method,
309                 &its_debouncing, &its_retention);
310     }
311 
312     // STEP 4: Check if the passenger enters an empty train
313     const std::pair<service_t, method_t> its_identifier = std::make_pair(
314             its_service, its_method);
315     if (target_train->passengers_.empty()) {
316         target_train->departure_ = its_retention;
317     } else {
318         if (target_train->passengers_.end()
319                 != target_train->passengers_.find(its_identifier)) {
320             must_depart = true;
321         } else {
322             // STEP 5: Check whether the current message fits into the current train
323             if (target_train->buffer_->size() + _size > endpoint_impl<Protocol>::max_message_size_) {
324                 must_depart = true;
325             } else {
326                 // STEP 6: Check debouncing time
327                 if (its_debouncing > target_train->minimal_max_retention_time_) {
328                     // train's latest departure would already undershot new
329                     // passenger's debounce time
330                     must_depart = true;
331                 } else {
332                     if (its_debouncing > target_train->departure_) {
333                         // train departs earlier as the new passenger's debounce
334                         // time allows
335                         must_depart = true;
336                     } else {
337                         // STEP 7: Check maximum retention time
338                         if (its_retention < target_train->minimal_debounce_time_) {
339                             // train's earliest departure would already exceed
340                             // the new passenger's retention time.
341                             must_depart = true;
342                         } else {
343                             if (its_retention < target_train->departure_) {
344                                 target_train->departure_ = its_retention;
345                             }
346                         }
347                     }
348                 }
349             }
350         }
351     }
352 
353     // STEP 8: if necessary, send current buffer and create a new one
354     if (must_depart) {
355         // STEP 8.1: check if debounce time would be undershot here if the train
356         // departs. Block sending until train is allowed to depart.
357         wait_until_debounce_time_reached(target_train);
358         queue_train(target_queue_iterator, target_train,
359                 queue_size_zero_on_entry);
360         target_train->departure_ = its_retention;
361     }
362 
363     // STEP 9: insert current message buffer
364     target_train->buffer_->insert(target_train->buffer_->end(), _data, _data + _size);
365     target_train->passengers_.insert(its_identifier);
366     // STEP 9.1: update the trains minimal debounce time if necessary
367     if (its_debouncing < target_train->minimal_debounce_time_) {
368         target_train->minimal_debounce_time_ = its_debouncing;
369     }
370     // STEP 9.2: update the trains minimal maximum retention time if necessary
371     if (its_retention < target_train->minimal_max_retention_time_) {
372         target_train->minimal_max_retention_time_ = its_retention;
373     }
374 
375     // STEP 10: restart timer with current departure time
376 #ifndef _WIN32
377     target_train->departure_timer_->expires_from_now(target_train->departure_);
378 #else
379     target_train->departure_timer_->expires_from_now(
380             std::chrono::duration_cast<
381                 std::chrono::steady_clock::duration>(target_train->departure_));
382 #endif
383     target_train->departure_timer_->async_wait(
384         std::bind(&server_endpoint_impl<Protocol>::flush_cbk,
385                   this->shared_from_this(), _target,
386                   target_train, std::placeholders::_1));
387 
388     return (true);
389 }
390 
391 template<typename Protocol>
send_segments(const tp::tp_split_messages_t & _segments,const endpoint_type & _target)392 void server_endpoint_impl<Protocol>::send_segments(
393         const tp::tp_split_messages_t &_segments, const endpoint_type &_target) {
394 
395     if (_segments.size() == 0)
396         return;
397 
398     const queue_iterator_type target_queue_iterator = find_or_create_queue_unlocked(_target);
399     const bool queue_size_zero_on_entry(target_queue_iterator->second.second.empty());
400 
401     std::shared_ptr<train> target_train = find_or_create_train_unlocked(_target);
402     target_train->update_departure_time_and_stop_departure();
403 
404     const service_t its_service = VSOMEIP_BYTES_TO_WORD(
405             (*(_segments[0]))[VSOMEIP_SERVICE_POS_MIN], (*(_segments[0]))[VSOMEIP_SERVICE_POS_MAX]);
406     const method_t its_method = VSOMEIP_BYTES_TO_WORD(
407             (*(_segments[0]))[VSOMEIP_METHOD_POS_MIN], (*(_segments[0]))[VSOMEIP_METHOD_POS_MAX]);
408 
409     std::chrono::nanoseconds its_debouncing(0), its_retention(0);
410     if (its_service != VSOMEIP_SD_SERVICE && its_method != VSOMEIP_SD_METHOD) {
411         get_configured_times_from_endpoint(its_service, its_method,
412                 &its_debouncing, &its_retention);
413     }
414     // update the trains minimal debounce time if necessary
415     if (its_debouncing < target_train->minimal_debounce_time_) {
416         target_train->minimal_debounce_time_ = its_debouncing;
417     }
418     // update the trains minimal maximum retention time if necessary
419     if (its_retention < target_train->minimal_max_retention_time_) {
420         target_train->minimal_max_retention_time_ = its_retention;
421     }
422     // We only need to respect the debouncing. There is no need to wait for further
423     // messages as we will send several now anyway.
424     if (!target_train->passengers_.empty()) {
425         wait_until_debounce_time_reached(target_train);
426         queue_train(target_queue_iterator, target_train, queue_size_zero_on_entry);
427     }
428 
429     const bool queue_size_still_zero(target_queue_iterator->second.second.empty());
430     for (const auto &s : _segments) {
431         target_queue_iterator->second.second.emplace_back(s);
432         target_queue_iterator->second.first += s->size();
433     }
434     if (queue_size_still_zero && !target_queue_iterator->second.second.empty()) { // no writing in progress
435         // respect minimal debounce time
436         wait_until_debounce_time_reached(target_train);
437         // ignore retention time and send immediately as the train is full anyway
438         send_queued(target_queue_iterator);
439     }
440     target_train->last_departure_ = std::chrono::steady_clock::now();
441 }
442 
443 template<typename Protocol>
wait_until_debounce_time_reached(const std::shared_ptr<train> & _train) const444 void server_endpoint_impl<Protocol>::wait_until_debounce_time_reached(
445         const std::shared_ptr<train>& _train) const {
446     const std::chrono::nanoseconds time_since_last_departure =
447             std::chrono::duration_cast<std::chrono::nanoseconds>(
448                     std::chrono::steady_clock::now() - _train->last_departure_);
449 
450     if (time_since_last_departure < _train->minimal_debounce_time_) {
451         std::this_thread::sleep_for(
452                 _train->minimal_debounce_time_ - time_since_last_departure);
453     }
454 }
455 
456 
457 template<typename Protocol>
check_message_size(const std::uint8_t * const _data,std::uint32_t _size,const endpoint_type & _target)458 typename endpoint_impl<Protocol>::cms_ret_e server_endpoint_impl<Protocol>::check_message_size(
459         const std::uint8_t * const _data, std::uint32_t _size,
460         const endpoint_type& _target) {
461     typename endpoint_impl<Protocol>::cms_ret_e ret(endpoint_impl<Protocol>::cms_ret_e::MSG_OK);
462     if (endpoint_impl<Protocol>::max_message_size_ != MESSAGE_SIZE_UNLIMITED
463             && _size > endpoint_impl<Protocol>::max_message_size_) {
464         if (endpoint_impl<Protocol>::is_supporting_someip_tp_ && _data != nullptr) {
465             const service_t its_service = VSOMEIP_BYTES_TO_WORD(
466                     _data[VSOMEIP_SERVICE_POS_MIN],
467                     _data[VSOMEIP_SERVICE_POS_MAX]);
468             const method_t its_method = VSOMEIP_BYTES_TO_WORD(
469                     _data[VSOMEIP_METHOD_POS_MIN],
470                     _data[VSOMEIP_METHOD_POS_MAX]);
471             if (tp_segmentation_enabled(its_service, its_method)) {
472                 send_segments(tp::tp::tp_split_message(_data, _size), _target);
473                 return endpoint_impl<Protocol>::cms_ret_e::MSG_WAS_SPLIT;
474             }
475         }
476         VSOMEIP_ERROR << "sei::send_intern: Dropping too big message (" << _size
477                 << " Bytes). Maximum allowed message size is: "
478                 << endpoint_impl<Protocol>::max_message_size_ << " Bytes.";
479         ret = endpoint_impl<Protocol>::cms_ret_e::MSG_TOO_BIG;
480     }
481     return ret;
482 }
483 
484 template<typename Protocol>
check_queue_limit(const uint8_t * _data,std::uint32_t _size,std::size_t _current_queue_size) const485 bool server_endpoint_impl<Protocol>::check_queue_limit(const uint8_t *_data, std::uint32_t _size,
486                        std::size_t _current_queue_size) const {
487     if (endpoint_impl<Protocol>::queue_limit_ != QUEUE_SIZE_UNLIMITED
488             && _current_queue_size + _size
489                     > endpoint_impl<Protocol>::queue_limit_) {
490         service_t its_service(0);
491         method_t its_method(0);
492         client_t its_client(0);
493         session_t its_session(0);
494         if (_size >= VSOMEIP_SESSION_POS_MAX) {
495             // this will yield wrong IDs for local communication as the commands
496             // are prepended to the actual payload
497             // it will print:
498             // (lowbyte service ID + highbyte methoid)
499             // [(Command + lowerbyte sender's client ID).
500             //  highbyte sender's client ID + lowbyte command size.
501             //  lowbyte methodid + highbyte vsomeip length]
502             its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
503                                                 _data[VSOMEIP_SERVICE_POS_MAX]);
504             its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
505                                                _data[VSOMEIP_METHOD_POS_MAX]);
506             its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
507                                                _data[VSOMEIP_CLIENT_POS_MAX]);
508             its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN],
509                                                 _data[VSOMEIP_SESSION_POS_MAX]);
510         }
511         VSOMEIP_ERROR << "sei::send_intern: queue size limit (" << std::dec
512                 << endpoint_impl<Protocol>::queue_limit_
513                 << ") reached. Dropping message ("
514                 << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
515                 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
516                 << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
517                 << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"
518                 << " queue_size: " << std::dec << _current_queue_size
519                 << " data size: " << std::dec << _size;
520         return false;
521     }
522     return true;
523 }
524 
525 template<typename Protocol>
queue_train(const queue_iterator_type _queue_iterator,const std::shared_ptr<train> & _train,bool _queue_size_zero_on_entry)526 void server_endpoint_impl<Protocol>::queue_train(
527                      const queue_iterator_type _queue_iterator,
528                      const std::shared_ptr<train>& _train,
529                      bool _queue_size_zero_on_entry) {
530     _queue_iterator->second.second.emplace_back(_train->buffer_);
531     _queue_iterator->second.first += _train->buffer_->size();
532     _train->last_departure_ = std::chrono::steady_clock::now();
533     _train->passengers_.clear();
534     _train->buffer_ = std::make_shared<message_buffer_t>();
535     _train->minimal_debounce_time_ = std::chrono::nanoseconds::max();
536     _train->minimal_max_retention_time_ = std::chrono::nanoseconds::max();
537     if (_queue_size_zero_on_entry && !_queue_iterator->second.second.empty()) { // no writing in progress
538         send_queued(_queue_iterator);
539     }
540 }
541 
542 template<typename Protocol>
543 typename server_endpoint_impl<Protocol>::queue_iterator_type
find_or_create_queue_unlocked(const endpoint_type & _target)544 server_endpoint_impl<Protocol>::find_or_create_queue_unlocked(const endpoint_type& _target) {
545     queue_iterator_type target_queue_iterator = queues_.find(_target);
546     if (target_queue_iterator == queues_.end()) {
547         target_queue_iterator = queues_.insert(queues_.begin(),
548                                     std::make_pair(
549                                         _target,
550                                         std::make_pair(std::size_t(0),
551                                                        std::deque<message_buffer_ptr_t>())
552                                     ));
553     }
554     return target_queue_iterator;
555 }
556 
557 template<typename Protocol>
find_or_create_train_unlocked(const endpoint_type & _target)558 std::shared_ptr<train> server_endpoint_impl<Protocol>::find_or_create_train_unlocked(
559         const endpoint_type& _target) {
560     auto train_iter = trains_.find(_target);
561     if (train_iter == trains_.end()) {
562         train_iter = trains_.insert(trains_.begin(),
563                                     std::make_pair(_target, std::make_shared<train>(this->service_)));
564     }
565     return train_iter->second;
566 }
567 
568 template<typename Protocol>
flush(endpoint_type _target,const std::shared_ptr<train> & _train)569 bool server_endpoint_impl<Protocol>::flush(
570         endpoint_type _target,
571         const std::shared_ptr<train>& _train) {
572     std::lock_guard<std::mutex> its_lock(mutex_);
573     bool is_flushed = false;
574     if (!_train->buffer_->empty()) {
575         const queue_iterator_type target_queue_iterator = queues_.find(_target);
576         if (target_queue_iterator != queues_.end()) {
577             const bool queue_size_zero_on_entry(target_queue_iterator->second.second.empty());
578             queue_train(target_queue_iterator, _train, queue_size_zero_on_entry);
579             is_flushed = true;
580         } else {
581             std::stringstream ss;
582             ss << "sei::flush couldn't find target queue, won't  queue train to: "
583                     << get_remote_information(_target) << " passengers: ";
584             for (const auto& p : _train->passengers_) {
585                 ss << "["  << std::hex << std::setw(4) << std::setfill('0')
586                     << p.first << ":" << p.second << "] ";
587             }
588             VSOMEIP_WARNING << ss.str();
589         }
590     }
591     return is_flushed;
592 }
593 
594 template<typename Protocol>
connect_cbk(boost::system::error_code const & _error)595 void server_endpoint_impl<Protocol>::connect_cbk(
596         boost::system::error_code const &_error) {
597     (void)_error;
598 }
599 
600 template<typename Protocol>
send_cbk(const queue_iterator_type _queue_iterator,boost::system::error_code const & _error,std::size_t _bytes)601 void server_endpoint_impl<Protocol>::send_cbk(
602         const queue_iterator_type _queue_iterator,
603         boost::system::error_code const &_error, std::size_t _bytes) {
604     (void)_bytes;
605 
606     {
607         std::lock_guard<std::mutex> its_sent_lock(sent_mutex_);
608         is_sending_ = false;
609 
610         boost::system::error_code ec;
611         sent_timer_.cancel(ec);
612     }
613 
614     std::lock_guard<std::mutex> its_lock(mutex_);
615 
616     auto check_if_all_msgs_for_stopped_service_are_sent = [&]() {
617         bool found_service_msg(false);
618         service_t its_stopped_service(ANY_SERVICE);
619         for (auto stp_hndlr_iter = prepare_stop_handlers_.begin();
620                   stp_hndlr_iter != prepare_stop_handlers_.end();) {
621             its_stopped_service = stp_hndlr_iter->first;
622             if (its_stopped_service == ANY_SERVICE) {
623                 ++stp_hndlr_iter;
624                 continue;
625             }
626             for (const auto& q : queues_) {
627                 for (const auto& msg : q.second.second ) {
628                     const service_t its_service = VSOMEIP_BYTES_TO_WORD(
629                                             (*msg)[VSOMEIP_SERVICE_POS_MIN],
630                                             (*msg)[VSOMEIP_SERVICE_POS_MAX]);
631                     if (its_service == its_stopped_service) {
632                         found_service_msg = true;
633                         break;
634                     }
635                 }
636                 if (found_service_msg) {
637                     break;
638                 }
639             }
640             if (found_service_msg) {
641                 ++stp_hndlr_iter;
642                 found_service_msg = false;
643             } else { // all messages of the to be stopped service have been sent
644                 auto handler = stp_hndlr_iter->second;
645                 auto ptr = this->shared_from_this();
646                 #ifndef _WIN32
647                 endpoint_impl<Protocol>::
648                 #endif
649                     service_.post([ptr, handler, its_stopped_service](){
650                         handler(ptr, its_stopped_service);
651                     });
652                 stp_hndlr_iter = prepare_stop_handlers_.erase(stp_hndlr_iter);
653             }
654         }
655     };
656 
657     auto check_if_all_queues_are_empty = [&](){
658         if (prepare_stop_handlers_.size() > 1) {
659             // before the endpoint was stopped completely other
660             // prepare_stop_handlers have been queued ensure to call them as well
661             check_if_all_msgs_for_stopped_service_are_sent();
662         }
663         if (std::all_of(queues_.begin(), queues_.end(), [&]
664                         #ifndef _WIN32
665                            (const typename queue_type::value_type& q)
666                         #else
667                            (const std::pair<endpoint_type,std::pair<size_t, std::deque<message_buffer_ptr_t>>>& q)
668                         #endif
669                            { return q.second.second.empty(); })) {
670             // all outstanding response have been sent.
671             auto found_cbk = prepare_stop_handlers_.find(ANY_SERVICE);
672             if (found_cbk != prepare_stop_handlers_.end()) {
673                 auto handler = found_cbk->second;
674                 auto ptr = this->shared_from_this();
675                 #ifndef _WIN32
676                 endpoint_impl<Protocol>::
677                 #endif
678                     service_.post([ptr, handler](){
679                             handler(ptr, ANY_SERVICE);
680                     });
681                 prepare_stop_handlers_.erase(found_cbk);
682             }
683         }
684     };
685 
686     auto& its_qpair = _queue_iterator->second;
687     if (!_error) {
688         its_qpair.first -= its_qpair.second.front()->size();
689         its_qpair.second.pop_front();
690 
691         if (!prepare_stop_handlers_.empty() && !endpoint_impl<Protocol>::sending_blocked_) {
692             // only one service instance is stopped
693             check_if_all_msgs_for_stopped_service_are_sent();
694         }
695 
696         if (its_qpair.second.size() > 0) {
697             send_queued(_queue_iterator);
698         } else if (!prepare_stop_handlers_.empty() && endpoint_impl<Protocol>::sending_blocked_) {
699             // endpoint is shutting down completely
700             queues_.erase(_queue_iterator);
701             check_if_all_queues_are_empty();
702         }
703     } else {
704         message_buffer_ptr_t its_buffer;
705         if (_queue_iterator->second.second.size()) {
706             its_buffer = _queue_iterator->second.second.front();
707         }
708         service_t its_service(0);
709         method_t its_method(0);
710         client_t its_client(0);
711         session_t its_session(0);
712         if (its_buffer && its_buffer->size() > VSOMEIP_SESSION_POS_MAX) {
713             its_service = VSOMEIP_BYTES_TO_WORD(
714                     (*its_buffer)[VSOMEIP_SERVICE_POS_MIN],
715                     (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]);
716             its_method = VSOMEIP_BYTES_TO_WORD(
717                     (*its_buffer)[VSOMEIP_METHOD_POS_MIN],
718                     (*its_buffer)[VSOMEIP_METHOD_POS_MAX]);
719             its_client = VSOMEIP_BYTES_TO_WORD(
720                     (*its_buffer)[VSOMEIP_CLIENT_POS_MIN],
721                     (*its_buffer)[VSOMEIP_CLIENT_POS_MAX]);
722             its_session = VSOMEIP_BYTES_TO_WORD(
723                     (*its_buffer)[VSOMEIP_SESSION_POS_MIN],
724                     (*its_buffer)[VSOMEIP_SESSION_POS_MAX]);
725         }
726         // error: sending of outstanding responses isn't started again
727         // delete remaining outstanding responses
728         VSOMEIP_WARNING << "sei::send_cbk received error: " << _error.message()
729                 << " (" << std::dec << _error.value() << ") "
730                 << get_remote_information(_queue_iterator) << " "
731                 << std::dec << _queue_iterator->second.second.size() << " "
732                 << std::dec << _queue_iterator->second.first << " ("
733                 << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
734                 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
735                 << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
736                 << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
737         queues_.erase(_queue_iterator);
738         if (!prepare_stop_handlers_.empty()) {
739             if (endpoint_impl<Protocol>::sending_blocked_) {
740                 // endpoint is shutting down completely, ensure to call
741                 // prepare_stop_handlers even in error cases
742                 check_if_all_queues_are_empty();
743             } else {
744                 // only one service instance is stopped
745                 check_if_all_msgs_for_stopped_service_are_sent();
746             }
747         }
748 
749     }
750 }
751 
752 template<typename Protocol>
flush_cbk(endpoint_type _target,const std::shared_ptr<train> & _train,const boost::system::error_code & _error_code)753 void server_endpoint_impl<Protocol>::flush_cbk(
754         endpoint_type _target,
755         const std::shared_ptr<train>& _train, const boost::system::error_code &_error_code) {
756     if (!_error_code) {
757         (void) flush(_target, _train);
758     }
759 }
760 
761 template<typename Protocol>
get_queue_size() const762 size_t server_endpoint_impl<Protocol>::get_queue_size() const {
763     size_t its_queue_size(0);
764 
765     {
766         std::lock_guard<std::mutex> its_lock(mutex_);
767         for (const auto &q : queues_) {
768             its_queue_size += q.second.second.size();
769         }
770     }
771 
772     return its_queue_size;
773 }
774 
775 // Instantiate template
776 #ifndef _WIN32
777 template class server_endpoint_impl<boost::asio::local::stream_protocol_ext>;
778 #endif
779 template class server_endpoint_impl<boost::asio::ip::tcp>;
780 template class server_endpoint_impl<boost::asio::ip::udp_ext>;
781 
782 }  // namespace vsomeip_v3
783