// Copyright 2022 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of // the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations under // the License. // Simple RPC server with the transfer service registered. Reads HDLC frames // with RPC packets through a socket. This server has a single resource ID that // is available, and data must be written to the server before data can be read // from the resource ID. // // Usage: // // integration_test_server 3300 <<< "resource_id: 12 file: '/tmp/gotbytes'" #include #include #include #include #include #include #include #include #include #include #include #include #include "google/protobuf/text_format.h" #include "pw_assert/check.h" #include "pw_chrono/system_clock.h" #include "pw_log/log.h" #include "pw_rpc_system_server/rpc_server.h" #include "pw_rpc_system_server/socket.h" #include "pw_stream/std_file_stream.h" #include "pw_thread/thread.h" #include "pw_thread_stl/options.h" #include "pw_transfer/integration_test/config.pb.h" #include "pw_transfer/transfer.h" namespace pw::transfer { namespace { using stream::MemoryReader; using stream::MemoryWriter; // This is the maximum size of the socket send buffers. Ideally, this is set // to the lowest allowed value to minimize buffering between the proxy and // clients so rate limiting causes the client to block and wait for the // integration test proxy to drain rather than allowing OS buffers to backlog // large quantities of data. // // Note that the OS may chose to not strictly follow this requested buffer size. // Still, setting this value to be as small as possible does reduce bufer sizes // significantly enough to better reflect typical inter-device communication. // // For this to be effective, servers should also configure their sockets to a // smaller receive buffer size. constexpr int kMaxSocketSendBufferSize = 1; class FileTransferHandler final : public ReadWriteHandler { public: FileTransferHandler(uint32_t resource_id, std::deque&& sources, std::deque&& destinations, std::string default_source_path, std::string default_destination_path, bool offsettable) : ReadWriteHandler(resource_id), sources_(sources), destinations_(destinations), default_source_path_(default_source_path), default_destination_path_(default_destination_path), offsettable(offsettable) {} ~FileTransferHandler() = default; Status PrepareRead() final { if (sources_.empty() && default_source_path_.length() == 0) { PW_LOG_ERROR("Source paths exhausted"); return Status::ResourceExhausted(); } std::string path; if (!sources_.empty()) { path = sources_.front(); sources_.pop_front(); } else { path = default_source_path_; } PW_LOG_DEBUG("Preparing read for file %s", path.c_str()); set_reader(stream_.emplace(path.c_str())); return OkStatus(); } Status PrepareRead(uint32_t offset) final { if (!offsettable) { return Status::Unimplemented(); } if (Status status = PrepareRead(); !status.ok()) { return status; } if (offset > std::get(stream_).ConservativeReadLimit()) { return Status::ResourceExhausted(); } return std::get(stream_).Seek(offset); } void FinalizeRead(Status) final { std::get(stream_).Close(); } Status PrepareWrite() final { if (destinations_.empty() && default_destination_path_.length() == 0) { PW_LOG_ERROR("Destination paths exhausted"); return Status::ResourceExhausted(); } std::string path; if (!destinations_.empty()) { path = destinations_.front(); destinations_.pop_front(); } else { path = default_destination_path_; } PW_LOG_DEBUG("Preparing write for file %s", path.c_str()); set_writer(stream_.emplace(path.c_str())); return OkStatus(); } Status PrepareWrite(uint32_t offset) final { if (!offsettable) { return Status::Unimplemented(); } if (Status status = PrepareWrite(); !status.ok()) { return status; } // It does not appear possible to hit this limit if (offset > std::get(stream_).ConservativeWriteLimit()) { return Status::ResourceExhausted(); } return std::get(stream_).Seek(offset); } Status FinalizeWrite(Status) final { std::get(stream_).Close(); return OkStatus(); } private: std::deque sources_; std::deque destinations_; std::string default_source_path_; std::string default_destination_path_; std::variant stream_; bool offsettable; }; void RunServer(int socket_port, ServerConfig config) { std::vector chunk_buffer(config.chunk_size_bytes()); std::vector encode_buffer(config.chunk_size_bytes()); transfer::Thread<4, 4> transfer_thread(chunk_buffer, encode_buffer); TransferService transfer_service( transfer_thread, config.pending_bytes(), std::chrono::seconds(config.chunk_timeout_seconds()), config.transfer_service_retries(), config.extend_window_divisor()); rpc::system_server::set_socket_port(socket_port); rpc::system_server::Init(); rpc::system_server::Server().RegisterService(transfer_service); // Start transfer thread. pw::Thread transfer_thread_handle(thread::stl::Options(), transfer_thread); int retval = rpc::system_server::SetServerSockOpt(SOL_SOCKET, SO_SNDBUF, &kMaxSocketSendBufferSize, sizeof(kMaxSocketSendBufferSize)); PW_CHECK_INT_EQ(retval, 0, "Failed to configure socket send buffer size with errno=%d", errno); std::vector> handlers; for (const auto& resource : config.resources()) { uint32_t id = resource.first; std::deque source_paths(resource.second.source_paths().begin(), resource.second.source_paths().end()); std::deque destination_paths( resource.second.destination_paths().begin(), resource.second.destination_paths().end()); auto handler = std::make_unique( id, std::move(source_paths), std::move(destination_paths), resource.second.default_source_path(), resource.second.default_destination_path(), resource.second.offsettable()); transfer_service.RegisterHandler(*handler); handlers.push_back(std::move(handler)); } PW_LOG_INFO("Starting pw_rpc server"); PW_CHECK_OK(rpc::system_server::Start()); // Unregister transfer handler before cleaning up the thread since doing so // requires the transfer thread to be running. for (auto& handler : handlers) { transfer_service.UnregisterHandler(*handler); } // End transfer thread. transfer_thread.Terminate(); transfer_thread_handle.join(); } } // namespace } // namespace pw::transfer int main(int argc, char* argv[]) { if (argc != 2) { PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]); return 1; } int port = std::atoi(argv[1]); PW_CHECK_UINT_GT(port, 0, "Invalid port!"); std::string config_string; std::string line; while (std::getline(std::cin, line)) { config_string = config_string + line + '\n'; } pw::transfer::ServerConfig config; bool ok = google::protobuf::TextFormat::ParseFromString(config_string, &config); if (!ok) { PW_LOG_INFO("Failed to parse config: %s", config_string.c_str()); PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]); return 1; } else { PW_LOG_INFO("Server loaded config:\n%s", config.DebugString().c_str()); } pw::transfer::RunServer(port, config); return 0; }