// Copyright 2024 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of // the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations under // the License. // Client binary for the cross-language integration test. // // Usage: // bazel-bin/pw_transfer/integration_test_client 3300 <<< "resource_id: 12 // file: '/tmp/myfile.txt'" // // WORK IN PROGRESS, SEE b/228516801 #include "pw_transfer/client.h" #include #include #include #include "google/protobuf/text_format.h" #include "pw_assert/check.h" #include "pw_log/log.h" #include "pw_rpc/channel.h" #include "pw_rpc/integration_testing.h" #include "pw_status/status.h" #include "pw_status/try.h" #include "pw_stream/std_file_stream.h" #include "pw_sync/binary_semaphore.h" #include "pw_thread/thread.h" #include "pw_thread_stl/options.h" #include "pw_transfer/integration_test/config.pb.h" #include "pw_transfer/transfer_thread.h" namespace pw::transfer::integration_test { namespace { // This is the maximum size of the socket send buffers. Ideally, this is set // to the lowest allowed value to minimize buffering between the proxy and // clients so rate limiting causes the client to block and wait for the // integration test proxy to drain rather than allowing OS buffers to backlog // large quantities of data. // // Note that the OS may chose to not strictly follow this requested buffer size. // Still, setting this value to be as small as possible does reduce bufer sizes // significantly enough to better reflect typical inter-device communication. // // For this to be effective, servers should also configure their sockets to a // smaller receive buffer size. constexpr int kMaxSocketSendBufferSize = 1; constexpr size_t kDefaultMaxWindowSizeBytes = 16384; thread::Options& TransferThreadOptions() { static thread::stl::Options options; return options; } // Transfer status, valid only after semaphore is acquired. // // We need to bundle the status and semaphore together because a pw_function // callback can at most capture the reference to one variable (and we need to // both set the status and release the semaphore). struct TransferResult { Status status = Status::Unknown(); sync::BinarySemaphore completed; }; // Create a pw_transfer client and perform the transfer actions. pw::Status PerformTransferActions(const pw::transfer::ClientConfig& config) { constexpr size_t kMaxPayloadSize = rpc::MaxSafePayloadSize(); std::byte chunk_buffer[kMaxPayloadSize]; std::byte encode_buffer[kMaxPayloadSize]; transfer::Thread<2, 2> transfer_thread(chunk_buffer, encode_buffer); pw::Thread system_thread(TransferThreadOptions(), transfer_thread); // As much as we don't want to dynamically allocate an array, // variable length arrays (VLA) are nonstandard, and a std::vector could cause // references to go stale if the vector's underlying buffer is resized. This // array of TransferResults needs to outlive the loop that performs the // actual transfer actions due to how some references to TransferResult // may persist beyond the lifetime of a transfer. const int num_actions = config.transfer_actions().size(); auto transfer_results = std::make_unique(num_actions); pw::transfer::Client client(rpc::integration_test::client(), rpc::integration_test::kChannelId, transfer_thread, kDefaultMaxWindowSizeBytes); // TODO: https://pwbug.dev/357145010 - Don't IgnoreError here. client.set_max_retries(config.max_retries()).IgnoreError(); client.set_max_lifetime_retries(config.max_lifetime_retries()).IgnoreError(); Status status = pw::OkStatus(); for (int i = 0; i < num_actions; i++) { const pw::transfer::TransferAction& action = config.transfer_actions()[i]; TransferResult& result = transfer_results[i]; // If no protocol version is specified, default to the latest version. pw::transfer::ProtocolVersion protocol_version = action.protocol_version() == pw::transfer::TransferAction::ProtocolVersion:: TransferAction_ProtocolVersion_UNKNOWN_VERSION ? pw::transfer::ProtocolVersion::kLatest : static_cast( action.protocol_version()); if (action.transfer_type() == pw::transfer::TransferAction::TransferType:: TransferAction_TransferType_WRITE_TO_SERVER) { pw::stream::StdFileReader input(action.file_path().c_str()); pw::Result handle = client.Write( action.resource_id(), input, [&result](Status status) { result.status = status; result.completed.release(); }, protocol_version, pw::transfer::cfg::kDefaultClientTimeout, pw::transfer::cfg::kDefaultInitialChunkTimeout, action.initial_offset()); if (handle.ok()) { // Wait for the transfer to complete. We need to do this here so that // the StdFileReader doesn't go out of scope. result.completed.acquire(); } else { result.status = handle.status(); } input.Close(); } else if (action.transfer_type() == pw::transfer::TransferAction::TransferType:: TransferAction_TransferType_READ_FROM_SERVER) { pw::stream::StdFileWriter output(action.file_path().c_str()); pw::Result handle = client.Read( action.resource_id(), output, [&result](Status status) { result.status = status; result.completed.release(); }, protocol_version, pw::transfer::cfg::kDefaultClientTimeout, pw::transfer::cfg::kDefaultInitialChunkTimeout, action.initial_offset()); if (handle.ok()) { // Wait for the transfer to complete. result.completed.acquire(); } else { result.status = handle.status(); } output.Close(); } else { PW_LOG_ERROR("Unrecognized transfer action type %d", action.transfer_type()); status = pw::Status::InvalidArgument(); break; } if (int(result.status.code()) != int(action.expected_status())) { PW_LOG_ERROR("Failed to perform action:\n%s", action.DebugString().c_str()); status = result.status.ok() ? Status::Unknown() : result.status; break; } } transfer_thread.Terminate(); system_thread.join(); // The RPC thread must join before destroying transfer objects as the transfer // service may still reference the transfer thread or transfer client objects. pw::rpc::integration_test::TerminateClient(); return status; } } // namespace } // namespace pw::transfer::integration_test int main(int argc, char* argv[]) { if (argc < 2) { PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]); return 1; } const int port = std::atoi(argv[1]); std::string config_string; std::string line; while (std::getline(std::cin, line)) { config_string = config_string + line + '\n'; } pw::transfer::ClientConfig config; bool ok = google::protobuf::TextFormat::ParseFromString(config_string, &config); if (!ok) { PW_LOG_INFO("Failed to parse config: %s", config_string.c_str()); PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]); return 1; } else { PW_LOG_INFO("Client loaded config:\n%s", config.DebugString().c_str()); } if (!pw::rpc::integration_test::InitializeClient(port).ok()) { return 1; } int retval = pw::rpc::integration_test::SetClientSockOpt( SOL_SOCKET, SO_SNDBUF, &pw::transfer::integration_test::kMaxSocketSendBufferSize, sizeof(pw::transfer::integration_test::kMaxSocketSendBufferSize)); PW_CHECK_INT_EQ(retval, 0, "Failed to configure socket send buffer size with errno=%d", errno); if (!pw::transfer::integration_test::PerformTransferActions(config).ok()) { PW_LOG_INFO("Failed to transfer!"); return 1; } return 0; }