1 // Copyright (C) 2019 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6 #include <iomanip>
7
8 #include "../include/tp_reassembler.hpp"
9
10 #include <vsomeip/defines.hpp>
11 #include <vsomeip/enumeration_types.hpp>
12 #include <vsomeip/internal/logger.hpp>
13
14 #include "../include/tp.hpp"
15 #include "../../utility/include/byteorder.hpp"
16
17 #ifdef ANDROID
18 #include "../../configuration/include/internal_android.hpp"
19 #else
20 #include "../../configuration/include/internal.hpp"
21 #endif // ANDROID
22
23 namespace vsomeip_v3 {
24 namespace tp {
25
tp_reassembler(std::uint32_t _max_message_size,boost::asio::io_service & _io)26 tp_reassembler::tp_reassembler(std::uint32_t _max_message_size, boost::asio::io_service &_io) :
27 max_message_size_(_max_message_size),
28 cleanup_timer_running_(false),
29 cleanup_timer_(_io) {
30 }
31
process_tp_message(const byte_t * const _data,std::uint32_t _data_size,const boost::asio::ip::address & _address,std::uint16_t _port)32 std::pair<bool, message_buffer_t> tp_reassembler::process_tp_message(
33 const byte_t* const _data, std::uint32_t _data_size,
34 const boost::asio::ip::address& _address, std::uint16_t _port) {
35 std::pair<bool, message_buffer_t> ret;
36 if (_data_size < VSOMEIP_FULL_HEADER_SIZE) {
37 return std::make_pair(false, message_buffer_t());
38 }
39
40 cleanup_timer_start(false);
41
42 const service_t its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
43 _data[VSOMEIP_SERVICE_POS_MAX]);
44 const method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
45 _data[VSOMEIP_METHOD_POS_MAX]);
46 const client_t its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
47 _data[VSOMEIP_CLIENT_POS_MAX]);
48 const session_t its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN],
49 _data[VSOMEIP_SESSION_POS_MAX]);
50 const interface_version_t its_interface_version = _data[VSOMEIP_INTERFACE_VERSION_POS];
51 const message_type_e its_msg_type = tp::tp_flag_unset(_data[VSOMEIP_MESSAGE_TYPE_POS]);
52
53 const std::uint64_t its_tp_message_id = ((static_cast<std::uint64_t>(its_service) << 48) |
54 (static_cast<std::uint64_t>(its_method) << 32) |
55 (static_cast<std::uint64_t>(its_client) << 16) |
56 (static_cast<std::uint64_t>(its_interface_version) << 8) |
57 (static_cast<std::uint64_t>(its_msg_type)));
58
59 std::lock_guard<std::mutex> its_lock(mutex_);
60 ret.first = false;
61 const auto found_ip = tp_messages_.find(_address);
62 if (found_ip != tp_messages_.end()) {
63 const auto found_port = found_ip->second.find(_port);
64 if (found_port != found_ip->second.end()) {
65 auto found_tp_msg = found_port->second.find(its_tp_message_id);
66 if (found_tp_msg != found_port->second.end()) {
67 if (found_tp_msg->second.first == its_session) {
68 // received additional segment for already known message
69 if (found_tp_msg->second.second.add_segment(_data, _data_size)) {
70 // message is complete
71 ret.first = true;
72 ret.second = found_tp_msg->second.second.get_message();
73 // cleanup tp_message as message was moved and cleanup map
74 found_port->second.erase(its_tp_message_id);
75 if (found_port->second.empty()) {
76 found_ip->second.erase(found_port);
77 if (found_ip->second.empty()) {
78 tp_messages_.erase(found_ip);
79 }
80 }
81 }
82 } else {
83 VSOMEIP_WARNING << __func__ << ": Received new segment "
84 "although old one is not finished yet. Dropping "
85 "old. ("
86 << std::hex << std::setw(4) << std::setfill('0') << its_client << ") ["
87 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
88 << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
89 << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(its_interface_version) << "."
90 << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(its_msg_type) << "] Old: 0x"
91 << std::hex << std::setw(4) << std::setfill('0') << found_tp_msg->second.first << ", new: 0x"
92 << std::hex << std::setw(4) << std::setfill('0') << its_session;
93 // new segment with different session id -> throw away current
94 found_tp_msg->second.first = its_session;
95 found_tp_msg->second.second = tp_message(_data, _data_size, max_message_size_);
96 }
97 } else {
98 found_port->second.emplace(
99 std::make_pair(its_tp_message_id,
100 std::make_pair(its_session,
101 tp_message(_data, _data_size, max_message_size_))));
102 }
103 } else {
104 found_ip->second[_port].emplace(
105 std::make_pair(its_tp_message_id,
106 std::make_pair(its_session,
107 tp_message(_data, _data_size, max_message_size_))));
108 }
109 } else {
110 tp_messages_[_address][_port].emplace(
111 std::make_pair(its_tp_message_id,
112 std::make_pair(its_session,
113 tp_message(_data, _data_size, max_message_size_))));
114 }
115 return ret;
116 }
117
cleanup_unfinished_messages()118 bool tp_reassembler::cleanup_unfinished_messages() {
119 std::lock_guard<std::mutex> its_lock(mutex_);
120 const std::chrono::steady_clock::time_point now =
121 std::chrono::steady_clock::now();
122 for (auto ip_iter = tp_messages_.begin(); ip_iter != tp_messages_.end();) {
123 for (auto port_iter = ip_iter->second.begin();
124 port_iter != ip_iter->second.end();) {
125 for (auto tp_id_iter = port_iter->second.begin();
126 tp_id_iter != port_iter->second.end();) {
127 if (std::chrono::duration_cast<std::chrono::milliseconds>(
128 now - tp_id_iter->second.second.get_creation_time()).count()
129 > 5000) {
130 // message is older than 5 seconds delete it
131 const service_t its_service = static_cast<service_t>(tp_id_iter->first >> 48);
132 const method_t its_method = static_cast<method_t>(tp_id_iter->first >> 32);
133 const client_t its_client = static_cast<client_t>(tp_id_iter->first >> 16);
134 const interface_version_t its_interface_version = static_cast<interface_version_t>(tp_id_iter->first >> 8);
135 const message_type_e its_msg_type = static_cast<message_type_e>(tp_id_iter->first >> 0);
136 VSOMEIP_WARNING << __func__
137 << ": deleting unfinished SOME/IP-TP message from: "
138 << ip_iter->first.to_string() << ":" << std::dec
139 << port_iter->first << " ("
140 << std::hex << std::setw(4) << std::setfill('0') << its_client << ") ["
141 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
142 << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
143 << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(its_interface_version) << "."
144 << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(its_msg_type) << "."
145 << std::hex << std::setw(4) << std::setfill('0') << tp_id_iter->second.first << "]";
146 tp_id_iter = port_iter->second.erase(tp_id_iter);
147 } else {
148 tp_id_iter++;
149 }
150 }
151 if (port_iter->second.empty()) {
152 port_iter = ip_iter->second.erase(port_iter);
153 } else {
154 port_iter++;
155 }
156 }
157 if (ip_iter->second.empty()) {
158 ip_iter = tp_messages_.erase(ip_iter);
159 } else {
160 ip_iter++;
161 }
162 }
163 return !tp_messages_.empty();
164 }
165
stop()166 void tp_reassembler::stop() {
167 std::lock_guard<std::mutex> its_lock(cleanup_timer_mutex_);
168 boost::system::error_code ec;
169 cleanup_timer_.cancel(ec);
170 }
171
cleanup_timer_start(bool _force)172 void tp_reassembler::cleanup_timer_start(bool _force) {
173 std::lock_guard<std::mutex> its_lock(cleanup_timer_mutex_);
174 cleanup_timer_start_unlocked(_force);
175 }
176
cleanup_timer_start_unlocked(bool _force)177 void tp_reassembler::cleanup_timer_start_unlocked(bool _force) {
178 boost::system::error_code ec;
179 if (!cleanup_timer_running_ || _force) {
180 cleanup_timer_.expires_from_now(std::chrono::seconds(5));
181 cleanup_timer_running_ = true;
182 cleanup_timer_.async_wait(
183 std::bind(&tp_reassembler::cleanup_timer_cbk,
184 shared_from_this(), std::placeholders::_1));
185 }
186 }
187
cleanup_timer_cbk(const boost::system::error_code _error)188 void tp_reassembler::cleanup_timer_cbk(
189 const boost::system::error_code _error) {
190 if (!_error) {
191 std::lock_guard<std::mutex> its_lock(cleanup_timer_mutex_);
192 if (cleanup_unfinished_messages()) {
193 cleanup_timer_start_unlocked(true);
194 } else {
195 // don't start timer again as there are no more segmented messages present
196 cleanup_timer_running_ = false;
197 }
198 }
199 }
200
201 } //namespace tp
202 } // namespace vsomeip_v3
203