1 // Copyright 2024 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // moqt_ingestion_server is a simple command-line utility that accepts incoming
6 // ANNOUNCE messages and records them into a file.
7
8 #include <sys/stat.h>
9
10 #include <cerrno>
11 #include <cstdint>
12 #include <fstream>
13 #include <ios>
14 #include <memory>
15 #include <optional>
16 #include <string>
17 #include <utility>
18 #include <vector>
19
20 #include "absl/algorithm/container.h"
21 #include "absl/container/node_hash_map.h"
22 #include "absl/functional/bind_front.h"
23 #include "absl/status/status.h"
24 #include "absl/status/statusor.h"
25 #include "absl/strings/ascii.h"
26 #include "absl/strings/str_cat.h"
27 #include "absl/strings/str_split.h"
28 #include "absl/strings/string_view.h"
29 #include "absl/time/clock.h"
30 #include "absl/time/time.h"
31 #include "quiche/quic/moqt/moqt_messages.h"
32 #include "quiche/quic/moqt/moqt_session.h"
33 #include "quiche/quic/moqt/moqt_track.h"
34 #include "quiche/quic/moqt/tools/moqt_server.h"
35 #include "quiche/quic/platform/api/quic_ip_address.h"
36 #include "quiche/quic/platform/api/quic_socket_address.h"
37 #include "quiche/common/platform/api/quiche_command_line_flags.h"
38 #include "quiche/common/platform/api/quiche_default_proof_providers.h"
39 #include "quiche/common/platform/api/quiche_file_utils.h"
40 #include "quiche/common/platform/api/quiche_logging.h"
41 #include "quiche/common/quiche_ip_address.h"
42
43 // Utility code for working with directories.
44 // TODO: make those cross-platform and move into quiche_file_utils.h.
45 namespace {
IsDirectory(absl::string_view path)46 absl::Status IsDirectory(absl::string_view path) {
47 std::string directory(path);
48 struct stat directory_stat;
49 int result = ::stat(directory.c_str(), &directory_stat);
50 if (result != 0) {
51 return absl::ErrnoToStatus(errno, "Failed to stat the directory");
52 }
53 if (!S_ISDIR(directory_stat.st_mode)) {
54 return absl::InvalidArgumentError("Requested path is not a directory");
55 }
56 return absl::OkStatus();
57 }
58
MakeDirectory(absl::string_view path)59 absl::Status MakeDirectory(absl::string_view path) {
60 int result = ::mkdir(std::string(path).c_str(), 0755);
61 if (result != 0) {
62 return absl::ErrnoToStatus(errno, "Failed to create directory");
63 }
64 return absl::OkStatus();
65 }
66 } // namespace
67
68 DEFINE_QUICHE_COMMAND_LINE_FLAG(
69 bool, allow_invalid_track_namespaces, false,
70 "If true, invalid track namespaces will be escaped rather than rejected.");
71 DEFINE_QUICHE_COMMAND_LINE_FLAG(
72 std::string, tracks, "video,audio",
73 "List of track names to request from the peer.");
74
75 namespace moqt {
76 namespace {
77
IsValidTrackNamespaceChar(char c)78 bool IsValidTrackNamespaceChar(char c) {
79 // Since we using track namespaces for directory names, limit the set of
80 // allowed characters.
81 return absl::ascii_isalnum(c) || c == '-' || c == '_';
82 }
83
IsValidTrackNamespace(absl::string_view track_namespace)84 bool IsValidTrackNamespace(absl::string_view track_namespace) {
85 return absl::c_all_of(track_namespace, IsValidTrackNamespaceChar);
86 }
87
CleanUpTrackNamespace(absl::string_view track_namespace)88 std::string CleanUpTrackNamespace(absl::string_view track_namespace) {
89 std::string output(track_namespace);
90 for (char& c : output) {
91 if (!IsValidTrackNamespaceChar(c)) {
92 c = '_';
93 }
94 }
95 return output;
96 }
97
98 // Maintains the state for individual incoming MoQT sessions.
99 class MoqtIngestionHandler {
100 public:
MoqtIngestionHandler(MoqtSession * session,absl::string_view output_root)101 explicit MoqtIngestionHandler(MoqtSession* session,
102 absl::string_view output_root)
103 : session_(session), output_root_(output_root) {
104 session_->callbacks().incoming_announce_callback =
105 absl::bind_front(&MoqtIngestionHandler::OnAnnounceReceived, this);
106 }
107
OnAnnounceReceived(absl::string_view track_namespace)108 std::optional<MoqtAnnounceErrorReason> OnAnnounceReceived(
109 absl::string_view track_namespace) {
110 if (!IsValidTrackNamespace(track_namespace) &&
111 !quiche::GetQuicheCommandLineFlag(
112 FLAGS_allow_invalid_track_namespaces)) {
113 QUICHE_DLOG(WARNING) << "Rejected remote announce as it contained "
114 "disallowed characters; namespace: "
115 << track_namespace;
116 return MoqtAnnounceErrorReason{
117 MoqtAnnounceErrorCode::kInternalError,
118 "Track namespace contains disallowed characters"};
119 }
120
121 std::string directory_name = absl::StrCat(
122 CleanUpTrackNamespace(track_namespace), "_",
123 absl::FormatTime("%Y%m%d_%H%M%S", absl::Now(), absl::UTCTimeZone()));
124 std::string directory_path = quiche::JoinPath(output_root_, directory_name);
125 auto [it, added] = subscribed_namespaces_.emplace(
126 track_namespace, NamespaceHandler(directory_path));
127 if (!added) {
128 // Received before; should be handled by already existing subscriptions.
129 return std::nullopt;
130 }
131
132 if (absl::Status status = MakeDirectory(directory_path); !status.ok()) {
133 subscribed_namespaces_.erase(it);
134 QUICHE_LOG(ERROR) << "Failed to create directory " << directory_path
135 << "; " << status;
136 return MoqtAnnounceErrorReason{MoqtAnnounceErrorCode::kInternalError,
137 "Failed to create output directory"};
138 }
139
140 std::string track_list = quiche::GetQuicheCommandLineFlag(FLAGS_tracks);
141 std::vector<absl::string_view> tracks_to_subscribe =
142 absl::StrSplit(track_list, ',', absl::AllowEmpty());
143 for (absl::string_view track : tracks_to_subscribe) {
144 session_->SubscribeCurrentGroup(track_namespace, track, &it->second);
145 }
146
147 return std::nullopt;
148 }
149
150 private:
151 class NamespaceHandler : public RemoteTrack::Visitor {
152 public:
NamespaceHandler(absl::string_view directory)153 explicit NamespaceHandler(absl::string_view directory)
154 : directory_(directory) {}
155
OnReply(const FullTrackName & full_track_name,std::optional<absl::string_view> error_reason_phrase)156 void OnReply(
157 const FullTrackName& full_track_name,
158 std::optional<absl::string_view> error_reason_phrase) override {
159 if (error_reason_phrase.has_value()) {
160 QUICHE_LOG(ERROR) << "Failed to subscribe to the peer track "
161 << full_track_name.track_namespace << " "
162 << full_track_name.track_name << ": "
163 << *error_reason_phrase;
164 }
165 }
166
OnObjectFragment(const FullTrackName & full_track_name,uint64_t group_sequence,uint64_t object_sequence,uint64_t,MoqtForwardingPreference,absl::string_view object,bool)167 void OnObjectFragment(const FullTrackName& full_track_name,
168 uint64_t group_sequence, uint64_t object_sequence,
169 uint64_t /*object_send_order*/,
170 MoqtForwardingPreference /*forwarding_preference*/,
171 absl::string_view object,
172 bool /*end_of_message*/) override {
173 std::string file_name = absl::StrCat(group_sequence, "-", object_sequence,
174 ".", full_track_name.track_name);
175 std::string file_path = quiche::JoinPath(directory_, file_name);
176 std::ofstream output(file_path, std::ios::binary | std::ios::ate);
177 output.write(object.data(), object.size());
178 output.close();
179 }
180
181 private:
182 std::string directory_;
183 };
184
185 MoqtSession* session_; // Not owned.
186 std::string output_root_;
187 absl::node_hash_map<std::string, NamespaceHandler> subscribed_namespaces_;
188 };
189
IncomingSessionHandler(std::string output_root,absl::string_view path)190 absl::StatusOr<MoqtConfigureSessionCallback> IncomingSessionHandler(
191 std::string output_root, absl::string_view path) {
192 if (path != "/ingest") {
193 return absl::NotFoundError("Unknown endpoint; try \"/ingest\".");
194 }
195 return [output_root](MoqtSession* session) {
196 auto handler = std::make_unique<MoqtIngestionHandler>(session, output_root);
197 session->callbacks().session_deleted_callback = [handler =
198 std::move(handler)] {};
199 };
200 }
201
202 } // namespace
203 } // namespace moqt
204
205 DEFINE_QUICHE_COMMAND_LINE_FLAG(std::string, bind_address, "127.0.0.1",
206 "Local IP address to bind to");
207 DEFINE_QUICHE_COMMAND_LINE_FLAG(uint16_t, port, 8000,
208 "Port for the server to listen on");
209
main(int argc,char ** argv)210 int main(int argc, char** argv) {
211 const char* usage = "Usage: moqt_ingestion_server [options] output_directory";
212 std::vector<std::string> args =
213 quiche::QuicheParseCommandLineFlags(usage, argc, argv);
214 if (args.size() != 1) {
215 quiche::QuichePrintCommandLineFlagHelp(usage);
216 return 1;
217 }
218
219 std::string output_directory = args[0];
220 if (absl::Status stat_status = IsDirectory(output_directory);
221 !stat_status.ok()) {
222 if (absl::IsNotFound(stat_status)) {
223 absl::Status mkdir_status = MakeDirectory(output_directory);
224 if (!mkdir_status.ok()) {
225 QUICHE_LOG(ERROR) << "Failed to create output directory: "
226 << mkdir_status;
227 return 1;
228 }
229 } else {
230 QUICHE_LOG(ERROR) << stat_status;
231 return 1;
232 }
233 }
234
235 moqt::MoqtServer server(
236 quiche::CreateDefaultProofSource(),
237 absl::bind_front(moqt::IncomingSessionHandler, output_directory));
238 quiche::QuicheIpAddress bind_address;
239 QUICHE_CHECK(bind_address.FromString(
240 quiche::GetQuicheCommandLineFlag(FLAGS_bind_address)));
241 server.quic_server().CreateUDPSocketAndListen(quic::QuicSocketAddress(
242 bind_address, quiche::GetQuicheCommandLineFlag(FLAGS_port)));
243 server.quic_server().HandleEventsForever();
244
245 return 0;
246 }
247