// Copyright (C) 2015-2019 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. #include "../npdu_tests/npdu_test_client.hpp" #include #include "../../implementation/configuration/include/configuration.hpp" #include "../../implementation/configuration/include/configuration_impl.hpp" #include "../../implementation/configuration/include/configuration_plugin.hpp" #include "../../implementation/plugin/include/plugin_manager_impl.hpp" enum class payloadsize : std::uint8_t { UDS, TCP, UDP }; // this variables are changed via cmdline parameters static bool use_tcp = false; static bool call_service_sync = true; static bool wait_for_replies = true; static std::uint32_t sliding_window_size = vsomeip_test::NUMBER_OF_MESSAGES_TO_SEND; static payloadsize max_payload_size = payloadsize::UDS; static bool shutdown_service_at_end = true; npdu_test_client::npdu_test_client( bool _use_tcp, bool _call_service_sync, std::uint32_t _sliding_window_size, bool _wait_for_replies, std::array, 4> _applicative_debounce) : app_(vsomeip::runtime::get()->create_application()), request_(vsomeip::runtime::get()->create_request(_use_tcp)), call_service_sync_(_call_service_sync), wait_for_replies_(_wait_for_replies), sliding_window_size_(_sliding_window_size), blocked_({false}), is_available_({false}), // will set first element to false, rest to 0 number_of_messages_to_send_(vsomeip_test::NUMBER_OF_MESSAGES_TO_SEND), number_of_sent_messages_{0,0,0,0}, number_of_acknowledged_messages_{{{0,0,0,0},{0,0,0,0},{0,0,0,0},{0,0,0,0}}}, current_payload_size_({0}), all_msg_acknowledged_({false, false, false, false}), acknowledgements_{0,0,0,0}, applicative_debounce_(_applicative_debounce), finished_waiter_(&npdu_test_client::wait_for_all_senders, this) { senders_[0] = std::thread(&npdu_test_client::run<0>, this); senders_[1] = std::thread(&npdu_test_client::run<1>, this); senders_[2] = std::thread(&npdu_test_client::run<2>, this); senders_[3] = std::thread(&npdu_test_client::run<3>, this); } npdu_test_client::~npdu_test_client() { finished_waiter_.join(); } void npdu_test_client::init() { app_->init(); app_->register_state_handler( std::bind(&npdu_test_client::on_state, this, std::placeholders::_1)); register_availability_handler<0>(); register_availability_handler<1>(); register_availability_handler<2>(); register_availability_handler<3>(); register_message_handler_for_all_service_methods<0>(); register_message_handler_for_all_service_methods<1>(); register_message_handler_for_all_service_methods<2>(); register_message_handler_for_all_service_methods<3>(); request_->set_service(vsomeip_test::TEST_SERVICE_SERVICE_ID); request_->set_instance(vsomeip_test::TEST_SERVICE_INSTANCE_ID); if(!wait_for_replies_) request_->set_message_type(vsomeip::message_type_e::MT_REQUEST_NO_RETURN); } template void npdu_test_client::register_availability_handler() { app_->register_availability_handler(npdu_test::service_ids[service_idx], npdu_test::instance_ids[service_idx], std::bind( &npdu_test_client::on_availability, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); } template void npdu_test_client::register_message_handler_for_all_service_methods() { register_message_handler(); register_message_handler(); register_message_handler(); register_message_handler(); } template void npdu_test_client::register_message_handler() { app_->register_message_handler(npdu_test::service_ids[service_idx], npdu_test::instance_ids[service_idx], npdu_test::method_ids[service_idx][method_idx], std::bind( &npdu_test_client::on_message, this, std::placeholders::_1)); } void npdu_test_client::start() { VSOMEIP_INFO << "Starting..."; app_->start(); } void npdu_test_client::stop() { VSOMEIP_INFO << "Stopping..."; app_->unregister_state_handler(); for (unsigned int i = 0; i< npdu_test::service_ids.size(); i++) { app_->unregister_availability_handler(npdu_test::service_ids[i], npdu_test::instance_ids[i]); for(unsigned int j = 0; j < npdu_test::method_ids[i].size(); j++) { app_->unregister_message_handler(npdu_test::service_ids[i], npdu_test::instance_ids[i], npdu_test::method_ids[i][j]); } } if(shutdown_service_at_end) { // notify the routing manager daemon that were finished request_->set_service(npdu_test::RMD_SERVICE_ID_CLIENT_SIDE); request_->set_instance(npdu_test::RMD_INSTANCE_ID); request_->set_method(npdu_test::RMD_SHUTDOWN_METHOD_ID); request_->set_payload(vsomeip::runtime::get()->create_payload()); request_->set_message_type(vsomeip::message_type_e::MT_REQUEST_NO_RETURN); app_->send(request_); // sleep otherwise the app will shutdown before the message reaches the rmd std::this_thread::sleep_for(std::chrono::seconds(5)); } app_->stop(); } void npdu_test_client::join_sender_thread() { for (auto& t : senders_) { t.join(); } } void npdu_test_client::on_state(vsomeip::state_type_e _state) { if(_state == vsomeip::state_type_e::ST_REGISTERED) { for (unsigned int i = 0; i< npdu_test::service_ids.size(); i++) { app_->request_service(npdu_test::service_ids[i], npdu_test::instance_ids[i]); } } } template void npdu_test_client::on_availability(vsomeip::service_t _service, vsomeip::instance_t _instance, bool _is_available) { VSOMEIP_INFO<< "Service [" << std::setw(4) << std::setfill('0') << std::hex << _service << "." << std::setw(4) << std::setfill('0') << _instance << "] is " << (_is_available ? "available." : "NOT available."); if(npdu_test::service_ids[service_idx] == _service && npdu_test::instance_ids[service_idx] == _instance) { if(is_available_[service_idx] && !_is_available) { is_available_[service_idx] = false; } else if(_is_available && !is_available_[service_idx]) { is_available_[service_idx] = true; send(); } } } template void npdu_test_client::on_message(const std::shared_ptr& _response) { (void)_response; //TODO make sure the replies were sent within demanded debounce times VSOMEIP_DEBUG << "Received reply from:" << std::setw(4) << std::setfill('0') << std::hex << npdu_test::service_ids[service_idx] << ":" << std::setw(4) << std::setfill('0') << std::hex << npdu_test::instance_ids[service_idx] << ":" << std::setw(4) << std::setfill('0') << std::hex << npdu_test::method_ids[service_idx][method_idx]; if(call_service_sync_) { // We notify the sender thread every time a message was acknowledged std::lock_guard lk(all_msg_acknowledged_mutexes_[service_idx][method_idx]); all_msg_acknowledged_[service_idx][method_idx] = true; all_msg_acknowledged_cvs_[service_idx][method_idx].notify_one(); } else { std::lock_guard its_lock(number_of_acknowledged_messages_mutexes_[service_idx][method_idx]); number_of_acknowledged_messages_[service_idx][method_idx]++; // We notify the sender thread only if all sent messages have been acknowledged if(number_of_acknowledged_messages_[service_idx][method_idx] == number_of_messages_to_send_) { std::lock_guard lk(all_msg_acknowledged_mutexes_[service_idx][method_idx]); // reset number_of_acknowledged_messages_[service_idx][method_idx] = 0; all_msg_acknowledged_[service_idx][method_idx] = true; all_msg_acknowledged_cvs_[service_idx][method_idx].notify_one(); } else if(number_of_acknowledged_messages_[service_idx][method_idx] % sliding_window_size == 0) { std::lock_guard lk(all_msg_acknowledged_mutexes_[service_idx][method_idx]); all_msg_acknowledged_[service_idx][method_idx] = true; all_msg_acknowledged_cvs_[service_idx][method_idx].notify_one(); } } } template void npdu_test_client::send() { std::lock_guard its_lock(mutexes_[service_idx]); blocked_[service_idx] = true; conditions_[service_idx].notify_one(); } template void npdu_test_client::run() { std::unique_lock its_lock(mutexes_[service_idx]); while (!blocked_[service_idx]) { conditions_[service_idx].wait(its_lock); } current_payload_size_[service_idx] = 1; std::uint32_t max_allowed_payload = get_max_allowed_payload(); for (int var = 0; var < 4; ++var) { payloads_[service_idx][var] = vsomeip::runtime::get()->create_payload(); payload_data_[service_idx][var] = std::vector(); } bool lastrun = false; while (current_payload_size_[service_idx] <= max_allowed_payload) { // prepare the payloads w/ current payloadsize for (int var = 0; var < 4; ++var) { // assign 0x11 to first, 0x22 to second... payload_data_[service_idx][var].assign( current_payload_size_[service_idx], static_cast(0x11 * (var + 1))); payloads_[service_idx][var]->set_data(payload_data_[service_idx][var]); } // send the payloads to the service's methods if(wait_for_replies_) { call_service_sync_ ? send_messages_sync() : send_messages_async(); } else { send_messages_and_dont_wait_for_reply(); } // Increase array size for next iteration current_payload_size_[service_idx] *= 2; //special case to test the biggest payload possible as last test // 16 Bytes are reserved for the SOME/IP header if(current_payload_size_[service_idx] > max_allowed_payload - 16 && !lastrun) { current_payload_size_[service_idx] = max_allowed_payload - 16; lastrun = true; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } blocked_[service_idx] = false; { std::lock_guard its_lock(finished_mutex_); finished_[service_idx] = true; } } std::uint32_t npdu_test_client::get_max_allowed_payload() { std::uint32_t payload; switch (max_payload_size) { case payloadsize::UDS: payload = VSOMEIP_MAX_LOCAL_MESSAGE_SIZE; break; case payloadsize::TCP: payload = 4095; break; case payloadsize::UDP: payload = VSOMEIP_MAX_UDP_MESSAGE_SIZE; break; default: payload = VSOMEIP_MAX_LOCAL_MESSAGE_SIZE; break; } return payload; } template void npdu_test_client::send_messages_sync() { std::thread t0 = start_send_thread_sync(); std::thread t1 = start_send_thread_sync(); std::thread t2 = start_send_thread_sync(); std::thread t3 = start_send_thread_sync(); t0.join(); t1.join(); t2.join(); t3.join(); } template std::thread npdu_test_client::start_send_thread_sync() { return std::thread([&]() { all_msg_acknowledged_unique_locks_[service_idx][method_idx] = std::unique_lock (all_msg_acknowledged_mutexes_[service_idx][method_idx]); std::shared_ptr request = vsomeip::runtime::get()->create_request(use_tcp); request->set_service(npdu_test::service_ids[service_idx]); request->set_instance(npdu_test::instance_ids[service_idx]); request->set_method(npdu_test::method_ids[service_idx][method_idx]); request->set_payload(payloads_[service_idx][method_idx]); for (std::uint32_t i = 0; i < number_of_messages_to_send_; i++) { all_msg_acknowledged_[service_idx][method_idx] = false; app_->send(request); std::chrono::high_resolution_clock::time_point sent = std::chrono::high_resolution_clock::now(); while(!all_msg_acknowledged_[service_idx][method_idx]) { all_msg_acknowledged_cvs_[service_idx][method_idx].wait( all_msg_acknowledged_unique_locks_[service_idx][method_idx]); } std::chrono::nanoseconds waited_for_response = std::chrono::high_resolution_clock::now() - sent; if(waited_for_response < applicative_debounce_[service_idx][method_idx]) { // make sure we don't send faster than debounce time + max retention time std::this_thread::sleep_for( applicative_debounce_[service_idx][method_idx] - waited_for_response); } } all_msg_acknowledged_unique_locks_[service_idx][method_idx].unlock(); }); } template void npdu_test_client::send_messages_async() { std::thread t0 = start_send_thread_async(); std::thread t1 = start_send_thread_async(); std::thread t2 = start_send_thread_async(); std::thread t3 = start_send_thread_async(); t0.join(); t1.join(); t2.join(); t3.join(); } template std::thread npdu_test_client::start_send_thread_async() { return std::thread([&]() { all_msg_acknowledged_unique_locks_[service_idx][method_idx] = std::unique_lock (all_msg_acknowledged_mutexes_[service_idx][method_idx]); std::shared_ptr request = vsomeip::runtime::get()->create_request(use_tcp); request->set_service(npdu_test::service_ids[service_idx]); request->set_instance(npdu_test::instance_ids[service_idx]); request->set_method(npdu_test::method_ids[service_idx][method_idx]); request->set_payload(payloads_[service_idx][method_idx]); for (std::uint32_t i = 0; i < number_of_messages_to_send_; i++) { app_->send(request); if((i+1) == number_of_messages_to_send_ || (i+1) % sliding_window_size == 0) { // wait until all send messages have been acknowledged // as long we wait lk is released; after wait returns lk is reacquired while(!all_msg_acknowledged_[service_idx][method_idx]) { all_msg_acknowledged_cvs_[service_idx][method_idx].wait( all_msg_acknowledged_unique_locks_[service_idx][method_idx]); } // Reset condition variable all_msg_acknowledged_[service_idx][method_idx] = false; } // make sure we don't send faster than debounce time + max retention time std::this_thread::sleep_for(applicative_debounce_[service_idx][method_idx]); } all_msg_acknowledged_unique_locks_[service_idx][method_idx].unlock(); }); } template void npdu_test_client::send_messages_and_dont_wait_for_reply() { std::thread t0 = start_send_thread(); std::thread t1 = start_send_thread(); std::thread t2 = start_send_thread(); std::thread t3 = start_send_thread(); t0.join(); t1.join(); t2.join(); t3.join(); } template std::thread npdu_test_client::start_send_thread() { return std::thread([&]() { std::shared_ptr request = vsomeip::runtime::get()->create_request(use_tcp); request->set_service(npdu_test::service_ids[service_idx]); request->set_instance(npdu_test::instance_ids[service_idx]); request->set_message_type(vsomeip::message_type_e::MT_REQUEST_NO_RETURN); request->set_method(npdu_test::method_ids[service_idx][method_idx]); request->set_payload(payloads_[service_idx][method_idx]); for (std::uint32_t i = 0; i < number_of_messages_to_send_; i++) { app_->send(request); // make sure we don't send faster than debounce time + max retention time std::this_thread::sleep_for(applicative_debounce_[service_idx][method_idx]); } }); } void npdu_test_client::wait_for_all_senders() { bool all_finished(false); while (!all_finished) { { std::lock_guard its_lock(finished_mutex_); if (std::all_of(finished_.begin(), finished_.end(), [](bool i) { return i; })) { all_finished = true; } } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } join_sender_thread(); if (!wait_for_replies_ || !call_service_sync_) { // sleep longer here as sending is asynchronously and it's necessary // to wait until all messages have left the application VSOMEIP_INFO << "Sleeping for 180sec since the client is running " "in --dont-wait-for-replies or --async mode. " "Otherwise it might be possible that not all messages leave the " "application."; for(int i = 0; i < 180; i++) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "."; std::cout.flush(); } } else { std::this_thread::sleep_for(std::chrono::seconds(5)); } stop(); } TEST(someip_npdu_test, send_different_payloads) { // get the configuration std::shared_ptr its_configuration; auto its_plugin = vsomeip::plugin_manager::get()->get_plugin( vsomeip::plugin_type_e::CONFIGURATION_PLUGIN, VSOMEIP_CFG_LIBRARY); if (its_plugin) { auto its_config_plugin = std::dynamic_pointer_cast(its_plugin); if (its_config_plugin) { its_configuration = its_config_plugin->get_configuration(""); } } if (!its_configuration) { ADD_FAILURE() << "No configuration object. " "Either memory overflow or loading error detected!"; return; } // used to store the debounce times std::array, 4> applicative_debounce; // query the debouncetimes from the configuration. We want to know the // debounce times which the _clients_ of this service have to comply with // when they send requests to this service. // This is necessary as we must ensure a applicative debouncing greater than // debounce time + maximum retention time. Therefore the send threads sleep // for this amount of time after sending a message. for(int service_id = 0; service_id < 4; service_id++) { for(int method_id = 0; method_id < 4; method_id++) { std::chrono::nanoseconds debounce(0), retention(0); its_configuration->get_configured_timing_requests( npdu_test::service_ids[service_id], its_configuration->get_unicast_address(npdu_test::service_ids[service_id], npdu_test::instance_ids[service_id]), its_configuration->get_unreliable_port( npdu_test::service_ids[service_id], npdu_test::instance_ids[service_id]), npdu_test::method_ids[service_id][method_id], &debounce, &retention); if (debounce == std::chrono::nanoseconds(VSOMEIP_DEFAULT_NPDU_DEBOUNCING_NANO) && retention == std::chrono::nanoseconds(VSOMEIP_DEFAULT_NPDU_MAXIMUM_RETENTION_NANO)) { // no timings specified don't don't sleep after sending... applicative_debounce[service_id][method_id] = std::chrono::milliseconds(0); } else { // we add 1 milliseconds to sleep a little bit longer applicative_debounce[service_id][method_id] = std::chrono::duration_cast< std::chrono::milliseconds>(debounce + retention) + std::chrono::milliseconds(1); } } } npdu_test_client test_client_(use_tcp, call_service_sync, sliding_window_size, wait_for_replies, applicative_debounce); test_client_.init(); test_client_.start(); } #ifndef _WIN32 int main(int argc, char** argv) { std::string tcp_enable("--TCP"); std::string udp_enable("--UDP"); std::string sync_enable("--sync"); std::string async_enable("--async"); std::string no_reply_enable("--dont-wait-for-replies"); std::string sliding_window_size_param("--sliding-window-size"); std::string max_payload_size_param("--max-payload-size"); std::string shutdown_service_disable_param("--dont-shutdown-service"); std::string help("--help"); int i = 1; while (i < argc) { if (tcp_enable == argv[i]) { use_tcp = true; } else if (udp_enable == argv[i]) { use_tcp = false; } else if (sync_enable == argv[i]) { call_service_sync = true; } else if (async_enable == argv[i]) { call_service_sync = false; } else if (no_reply_enable == argv[i]) { wait_for_replies = false; } else if (sliding_window_size_param == argv[i] && i + 1 < argc) { i++; std::stringstream converter(argv[i]); converter >> sliding_window_size; } else if (max_payload_size_param == argv[i] && i + 1 < argc) { i++; if (std::string("UDS") == argv[i]) { max_payload_size = payloadsize::UDS; } else if (std::string("TCP") == argv[i]) { max_payload_size = payloadsize::TCP; } else if (std::string("UDP") == argv[i]) { max_payload_size = payloadsize::UDP; } } else if (shutdown_service_disable_param == argv[i]) { shutdown_service_at_end = false; } else if (help == argv[i]) { VSOMEIP_INFO << "Parameters:\n" << "--TCP: Send messages via TCP\n" << "--UDP: Send messages via UDP (default)\n" << "--sync: Wait for acknowledge before sending next message (default)\n" << "--async: Send multiple messages w/o waiting for" " acknowledge of service\n" << "--dont-wait-for-replies: Just send out the messages w/o waiting for " "a reply by the service (use REQUEST_NO_RETURN message type)\n" << "--sliding-window-size: Number of messages to send before waiting " "for acknowledge of service. Default: " << sliding_window_size << "\n" << "--max-payload-size: limit the maximum payloadsize of send requests. One of {" "UDS (=" << VSOMEIP_MAX_LOCAL_MESSAGE_SIZE << "byte), " "UDP (=" << VSOMEIP_MAX_UDP_MESSAGE_SIZE << "byte), " "TCP (=" << VSOMEIP_MAX_TCP_MESSAGE_SIZE << "byte)}, default: UDS\n" << "--dont-shutdown-service: Don't shutdown the service upon " "finishing of the payload test\n" << "--help: print this help"; } i++; } ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } #endif