xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2024 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // moqt_ingestion_server is a simple command-line utility that accepts incoming
6 // ANNOUNCE messages and records them into a file.
7 
8 #include <sys/stat.h>
9 
10 #include <cerrno>
11 #include <cstdint>
12 #include <fstream>
13 #include <ios>
14 #include <memory>
15 #include <optional>
16 #include <string>
17 #include <utility>
18 #include <vector>
19 
20 #include "absl/algorithm/container.h"
21 #include "absl/container/node_hash_map.h"
22 #include "absl/functional/bind_front.h"
23 #include "absl/status/status.h"
24 #include "absl/status/statusor.h"
25 #include "absl/strings/ascii.h"
26 #include "absl/strings/str_cat.h"
27 #include "absl/strings/str_split.h"
28 #include "absl/strings/string_view.h"
29 #include "absl/time/clock.h"
30 #include "absl/time/time.h"
31 #include "quiche/quic/moqt/moqt_messages.h"
32 #include "quiche/quic/moqt/moqt_session.h"
33 #include "quiche/quic/moqt/moqt_track.h"
34 #include "quiche/quic/moqt/tools/moqt_server.h"
35 #include "quiche/quic/platform/api/quic_ip_address.h"
36 #include "quiche/quic/platform/api/quic_socket_address.h"
37 #include "quiche/common/platform/api/quiche_command_line_flags.h"
38 #include "quiche/common/platform/api/quiche_default_proof_providers.h"
39 #include "quiche/common/platform/api/quiche_file_utils.h"
40 #include "quiche/common/platform/api/quiche_logging.h"
41 #include "quiche/common/quiche_ip_address.h"
42 
43 // Utility code for working with directories.
44 // TODO: make those cross-platform and move into quiche_file_utils.h.
45 namespace {
IsDirectory(absl::string_view path)46 absl::Status IsDirectory(absl::string_view path) {
47   std::string directory(path);
48   struct stat directory_stat;
49   int result = ::stat(directory.c_str(), &directory_stat);
50   if (result != 0) {
51     return absl::ErrnoToStatus(errno, "Failed to stat the directory");
52   }
53   if (!S_ISDIR(directory_stat.st_mode)) {
54     return absl::InvalidArgumentError("Requested path is not a directory");
55   }
56   return absl::OkStatus();
57 }
58 
MakeDirectory(absl::string_view path)59 absl::Status MakeDirectory(absl::string_view path) {
60   int result = ::mkdir(std::string(path).c_str(), 0755);
61   if (result != 0) {
62     return absl::ErrnoToStatus(errno, "Failed to create directory");
63   }
64   return absl::OkStatus();
65 }
66 }  // namespace
67 
68 DEFINE_QUICHE_COMMAND_LINE_FLAG(
69     bool, allow_invalid_track_namespaces, false,
70     "If true, invalid track namespaces will be escaped rather than rejected.");
71 DEFINE_QUICHE_COMMAND_LINE_FLAG(
72     std::string, tracks, "video,audio",
73     "List of track names to request from the peer.");
74 
75 namespace moqt {
76 namespace {
77 
IsValidTrackNamespaceChar(char c)78 bool IsValidTrackNamespaceChar(char c) {
79   // Since we using track namespaces for directory names, limit the set of
80   // allowed characters.
81   return absl::ascii_isalnum(c) || c == '-' || c == '_';
82 }
83 
IsValidTrackNamespace(absl::string_view track_namespace)84 bool IsValidTrackNamespace(absl::string_view track_namespace) {
85   return absl::c_all_of(track_namespace, IsValidTrackNamespaceChar);
86 }
87 
CleanUpTrackNamespace(absl::string_view track_namespace)88 std::string CleanUpTrackNamespace(absl::string_view track_namespace) {
89   std::string output(track_namespace);
90   for (char& c : output) {
91     if (!IsValidTrackNamespaceChar(c)) {
92       c = '_';
93     }
94   }
95   return output;
96 }
97 
98 // Maintains the state for individual incoming MoQT sessions.
99 class MoqtIngestionHandler {
100  public:
MoqtIngestionHandler(MoqtSession * session,absl::string_view output_root)101   explicit MoqtIngestionHandler(MoqtSession* session,
102                                 absl::string_view output_root)
103       : session_(session), output_root_(output_root) {
104     session_->callbacks().incoming_announce_callback =
105         absl::bind_front(&MoqtIngestionHandler::OnAnnounceReceived, this);
106   }
107 
OnAnnounceReceived(absl::string_view track_namespace)108   std::optional<MoqtAnnounceErrorReason> OnAnnounceReceived(
109       absl::string_view track_namespace) {
110     if (!IsValidTrackNamespace(track_namespace) &&
111         !quiche::GetQuicheCommandLineFlag(
112             FLAGS_allow_invalid_track_namespaces)) {
113       QUICHE_DLOG(WARNING) << "Rejected remote announce as it contained "
114                               "disallowed characters; namespace: "
115                            << track_namespace;
116       return MoqtAnnounceErrorReason{
117           MoqtAnnounceErrorCode::kInternalError,
118           "Track namespace contains disallowed characters"};
119     }
120 
121     std::string directory_name = absl::StrCat(
122         CleanUpTrackNamespace(track_namespace), "_",
123         absl::FormatTime("%Y%m%d_%H%M%S", absl::Now(), absl::UTCTimeZone()));
124     std::string directory_path = quiche::JoinPath(output_root_, directory_name);
125     auto [it, added] = subscribed_namespaces_.emplace(
126         track_namespace, NamespaceHandler(directory_path));
127     if (!added) {
128       // Received before; should be handled by already existing subscriptions.
129       return std::nullopt;
130     }
131 
132     if (absl::Status status = MakeDirectory(directory_path); !status.ok()) {
133       subscribed_namespaces_.erase(it);
134       QUICHE_LOG(ERROR) << "Failed to create directory " << directory_path
135                         << "; " << status;
136       return MoqtAnnounceErrorReason{MoqtAnnounceErrorCode::kInternalError,
137                                      "Failed to create output directory"};
138     }
139 
140     std::string track_list = quiche::GetQuicheCommandLineFlag(FLAGS_tracks);
141     std::vector<absl::string_view> tracks_to_subscribe =
142         absl::StrSplit(track_list, ',', absl::AllowEmpty());
143     for (absl::string_view track : tracks_to_subscribe) {
144       session_->SubscribeCurrentGroup(track_namespace, track, &it->second);
145     }
146 
147     return std::nullopt;
148   }
149 
150  private:
151   class NamespaceHandler : public RemoteTrack::Visitor {
152    public:
NamespaceHandler(absl::string_view directory)153     explicit NamespaceHandler(absl::string_view directory)
154         : directory_(directory) {}
155 
OnReply(const FullTrackName & full_track_name,std::optional<absl::string_view> error_reason_phrase)156     void OnReply(
157         const FullTrackName& full_track_name,
158         std::optional<absl::string_view> error_reason_phrase) override {
159       if (error_reason_phrase.has_value()) {
160         QUICHE_LOG(ERROR) << "Failed to subscribe to the peer track "
161                           << full_track_name.track_namespace << " "
162                           << full_track_name.track_name << ": "
163                           << *error_reason_phrase;
164       }
165     }
166 
OnObjectFragment(const FullTrackName & full_track_name,uint64_t group_sequence,uint64_t object_sequence,uint64_t,MoqtForwardingPreference,absl::string_view object,bool)167     void OnObjectFragment(const FullTrackName& full_track_name,
168                           uint64_t group_sequence, uint64_t object_sequence,
169                           uint64_t /*object_send_order*/,
170                           MoqtForwardingPreference /*forwarding_preference*/,
171                           absl::string_view object,
172                           bool /*end_of_message*/) override {
173       std::string file_name = absl::StrCat(group_sequence, "-", object_sequence,
174                                            ".", full_track_name.track_name);
175       std::string file_path = quiche::JoinPath(directory_, file_name);
176       std::ofstream output(file_path, std::ios::binary | std::ios::ate);
177       output.write(object.data(), object.size());
178       output.close();
179     }
180 
181    private:
182     std::string directory_;
183   };
184 
185   MoqtSession* session_;  // Not owned.
186   std::string output_root_;
187   absl::node_hash_map<std::string, NamespaceHandler> subscribed_namespaces_;
188 };
189 
IncomingSessionHandler(std::string output_root,absl::string_view path)190 absl::StatusOr<MoqtConfigureSessionCallback> IncomingSessionHandler(
191     std::string output_root, absl::string_view path) {
192   if (path != "/ingest") {
193     return absl::NotFoundError("Unknown endpoint; try \"/ingest\".");
194   }
195   return [output_root](MoqtSession* session) {
196     auto handler = std::make_unique<MoqtIngestionHandler>(session, output_root);
197     session->callbacks().session_deleted_callback = [handler =
198                                                          std::move(handler)] {};
199   };
200 }
201 
202 }  // namespace
203 }  // namespace moqt
204 
205 DEFINE_QUICHE_COMMAND_LINE_FLAG(std::string, bind_address, "127.0.0.1",
206                                 "Local IP address to bind to");
207 DEFINE_QUICHE_COMMAND_LINE_FLAG(uint16_t, port, 8000,
208                                 "Port for the server to listen on");
209 
main(int argc,char ** argv)210 int main(int argc, char** argv) {
211   const char* usage = "Usage: moqt_ingestion_server [options] output_directory";
212   std::vector<std::string> args =
213       quiche::QuicheParseCommandLineFlags(usage, argc, argv);
214   if (args.size() != 1) {
215     quiche::QuichePrintCommandLineFlagHelp(usage);
216     return 1;
217   }
218 
219   std::string output_directory = args[0];
220   if (absl::Status stat_status = IsDirectory(output_directory);
221       !stat_status.ok()) {
222     if (absl::IsNotFound(stat_status)) {
223       absl::Status mkdir_status = MakeDirectory(output_directory);
224       if (!mkdir_status.ok()) {
225         QUICHE_LOG(ERROR) << "Failed to create output directory: "
226                           << mkdir_status;
227         return 1;
228       }
229     } else {
230       QUICHE_LOG(ERROR) << stat_status;
231       return 1;
232     }
233   }
234 
235   moqt::MoqtServer server(
236       quiche::CreateDefaultProofSource(),
237       absl::bind_front(moqt::IncomingSessionHandler, output_directory));
238   quiche::QuicheIpAddress bind_address;
239   QUICHE_CHECK(bind_address.FromString(
240       quiche::GetQuicheCommandLineFlag(FLAGS_bind_address)));
241   server.quic_server().CreateUDPSocketAndListen(quic::QuicSocketAddress(
242       bind_address, quiche::GetQuicheCommandLineFlag(FLAGS_port)));
243   server.quic_server().HandleEventsForever();
244 
245   return 0;
246 }
247