1 // Copyright (c) 2021 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef QUICHE_QUIC_TOOLS_WEB_TRANSPORT_TEST_VISITORS_H_ 6 #define QUICHE_QUIC_TOOLS_WEB_TRANSPORT_TEST_VISITORS_H_ 7 8 #include <string> 9 10 #include "absl/status/status.h" 11 #include "quiche/quic/core/web_transport_interface.h" 12 #include "quiche/quic/platform/api/quic_logging.h" 13 #include "quiche/common/platform/api/quiche_logging.h" 14 #include "quiche/common/quiche_circular_deque.h" 15 #include "quiche/common/quiche_stream.h" 16 #include "quiche/common/simple_buffer_allocator.h" 17 #include "quiche/web_transport/complete_buffer_visitor.h" 18 #include "quiche/web_transport/web_transport.h" 19 20 namespace quic { 21 22 // Discards any incoming data. 23 class WebTransportDiscardVisitor : public WebTransportStreamVisitor { 24 public: WebTransportDiscardVisitor(WebTransportStream * stream)25 WebTransportDiscardVisitor(WebTransportStream* stream) : stream_(stream) {} 26 OnCanRead()27 void OnCanRead() override { 28 std::string buffer; 29 WebTransportStream::ReadResult result = stream_->Read(&buffer); 30 QUIC_DVLOG(2) << "Read " << result.bytes_read 31 << " bytes from WebTransport stream " 32 << stream_->GetStreamId() << ", fin: " << result.fin; 33 } 34 OnCanWrite()35 void OnCanWrite() override {} 36 OnResetStreamReceived(WebTransportStreamError)37 void OnResetStreamReceived(WebTransportStreamError /*error*/) override {} OnStopSendingReceived(WebTransportStreamError)38 void OnStopSendingReceived(WebTransportStreamError /*error*/) override {} OnWriteSideInDataRecvdState()39 void OnWriteSideInDataRecvdState() override {} 40 41 private: 42 WebTransportStream* stream_; 43 }; 44 45 class DiscardWebTransportSessionVisitor : public WebTransportVisitor { 46 public: DiscardWebTransportSessionVisitor(WebTransportSession * session)47 DiscardWebTransportSessionVisitor(WebTransportSession* session) 48 : session_(session) {} 49 OnSessionReady()50 void OnSessionReady() override {} OnSessionClosed(WebTransportSessionError,const std::string &)51 void OnSessionClosed(WebTransportSessionError /*error_code*/, 52 const std::string& /*error_message*/) override {} 53 OnIncomingBidirectionalStreamAvailable()54 void OnIncomingBidirectionalStreamAvailable() override { 55 while (true) { 56 WebTransportStream* stream = 57 session_->AcceptIncomingBidirectionalStream(); 58 if (stream == nullptr) { 59 return; 60 } 61 stream->SetVisitor(std::make_unique<WebTransportDiscardVisitor>(stream)); 62 stream->visitor()->OnCanRead(); 63 } 64 } 65 OnIncomingUnidirectionalStreamAvailable()66 void OnIncomingUnidirectionalStreamAvailable() override { 67 while (true) { 68 WebTransportStream* stream = 69 session_->AcceptIncomingUnidirectionalStream(); 70 if (stream == nullptr) { 71 return; 72 } 73 stream->SetVisitor(std::make_unique<WebTransportDiscardVisitor>(stream)); 74 stream->visitor()->OnCanRead(); 75 } 76 } 77 OnDatagramReceived(absl::string_view)78 void OnDatagramReceived(absl::string_view) override {} OnCanCreateNewOutgoingBidirectionalStream()79 void OnCanCreateNewOutgoingBidirectionalStream() override {} OnCanCreateNewOutgoingUnidirectionalStream()80 void OnCanCreateNewOutgoingUnidirectionalStream() override {} 81 82 private: 83 webtransport::Session* session_; 84 }; 85 86 // Echoes any incoming data back on the same stream. 87 class WebTransportBidirectionalEchoVisitor : public WebTransportStreamVisitor { 88 public: WebTransportBidirectionalEchoVisitor(WebTransportStream * stream)89 WebTransportBidirectionalEchoVisitor(WebTransportStream* stream) 90 : stream_(stream) {} 91 OnCanRead()92 void OnCanRead() override { 93 WebTransportStream::ReadResult result = stream_->Read(&buffer_); 94 QUIC_DVLOG(1) << "Attempted reading on WebTransport bidirectional stream " 95 << stream_->GetStreamId() 96 << ", bytes read: " << result.bytes_read; 97 if (result.fin) { 98 send_fin_ = true; 99 } 100 OnCanWrite(); 101 } 102 OnCanWrite()103 void OnCanWrite() override { 104 if (stop_sending_received_) { 105 return; 106 } 107 108 if (!buffer_.empty()) { 109 absl::Status status = quiche::WriteIntoStream(*stream_, buffer_); 110 QUIC_DVLOG(1) << "Attempted writing on WebTransport bidirectional stream " 111 << stream_->GetStreamId() << ", success: " << status; 112 if (!status.ok()) { 113 return; 114 } 115 116 buffer_ = ""; 117 } 118 119 if (send_fin_ && !fin_sent_) { 120 absl::Status status = quiche::SendFinOnStream(*stream_); 121 if (status.ok()) { 122 fin_sent_ = true; 123 } 124 } 125 } 126 OnResetStreamReceived(WebTransportStreamError)127 void OnResetStreamReceived(WebTransportStreamError /*error*/) override { 128 // Send FIN in response to a stream reset. We want to test that we can 129 // operate one side of the stream cleanly while the other is reset, thus 130 // replying with a FIN rather than a RESET_STREAM is more appropriate here. 131 send_fin_ = true; 132 OnCanWrite(); 133 } OnStopSendingReceived(WebTransportStreamError)134 void OnStopSendingReceived(WebTransportStreamError /*error*/) override { 135 stop_sending_received_ = true; 136 } OnWriteSideInDataRecvdState()137 void OnWriteSideInDataRecvdState() override {} 138 139 protected: stream()140 WebTransportStream* stream() { return stream_; } 141 142 private: 143 WebTransportStream* stream_; 144 std::string buffer_; 145 bool send_fin_ = false; 146 bool fin_sent_ = false; 147 bool stop_sending_received_ = false; 148 }; 149 150 using WebTransportUnidirectionalEchoReadVisitor = 151 ::webtransport::CompleteBufferVisitor; 152 using WebTransportUnidirectionalEchoWriteVisitor = 153 ::webtransport::CompleteBufferVisitor; 154 155 // A session visitor which sets unidirectional or bidirectional stream visitors 156 // to echo. 157 class EchoWebTransportSessionVisitor : public WebTransportVisitor { 158 public: 159 EchoWebTransportSessionVisitor(WebTransportSession* session, 160 bool open_server_initiated_echo_stream = true) session_(session)161 : session_(session), 162 echo_stream_opened_(!open_server_initiated_echo_stream) {} 163 OnSessionReady()164 void OnSessionReady() override { 165 if (session_->CanOpenNextOutgoingBidirectionalStream()) { 166 OnCanCreateNewOutgoingBidirectionalStream(); 167 } 168 } 169 OnSessionClosed(WebTransportSessionError,const std::string &)170 void OnSessionClosed(WebTransportSessionError /*error_code*/, 171 const std::string& /*error_message*/) override {} 172 OnIncomingBidirectionalStreamAvailable()173 void OnIncomingBidirectionalStreamAvailable() override { 174 while (true) { 175 WebTransportStream* stream = 176 session_->AcceptIncomingBidirectionalStream(); 177 if (stream == nullptr) { 178 return; 179 } 180 QUIC_DVLOG(1) 181 << "EchoWebTransportSessionVisitor received a bidirectional stream " 182 << stream->GetStreamId(); 183 stream->SetVisitor( 184 std::make_unique<WebTransportBidirectionalEchoVisitor>(stream)); 185 stream->visitor()->OnCanRead(); 186 } 187 } 188 OnIncomingUnidirectionalStreamAvailable()189 void OnIncomingUnidirectionalStreamAvailable() override { 190 while (true) { 191 WebTransportStream* stream = 192 session_->AcceptIncomingUnidirectionalStream(); 193 if (stream == nullptr) { 194 return; 195 } 196 QUIC_DVLOG(1) 197 << "EchoWebTransportSessionVisitor received a unidirectional stream"; 198 stream->SetVisitor( 199 std::make_unique<WebTransportUnidirectionalEchoReadVisitor>( 200 stream, [this](const std::string& data) { 201 streams_to_echo_back_.push_back(data); 202 TrySendingUnidirectionalStreams(); 203 })); 204 stream->visitor()->OnCanRead(); 205 } 206 } 207 OnDatagramReceived(absl::string_view datagram)208 void OnDatagramReceived(absl::string_view datagram) override { 209 session_->SendOrQueueDatagram(datagram); 210 } 211 OnCanCreateNewOutgoingBidirectionalStream()212 void OnCanCreateNewOutgoingBidirectionalStream() override { 213 if (!echo_stream_opened_) { 214 WebTransportStream* stream = session_->OpenOutgoingBidirectionalStream(); 215 stream->SetVisitor( 216 std::make_unique<WebTransportBidirectionalEchoVisitor>(stream)); 217 echo_stream_opened_ = true; 218 } 219 } OnCanCreateNewOutgoingUnidirectionalStream()220 void OnCanCreateNewOutgoingUnidirectionalStream() override { 221 TrySendingUnidirectionalStreams(); 222 } 223 TrySendingUnidirectionalStreams()224 void TrySendingUnidirectionalStreams() { 225 while (!streams_to_echo_back_.empty() && 226 session_->CanOpenNextOutgoingUnidirectionalStream()) { 227 QUIC_DVLOG(1) 228 << "EchoWebTransportServer echoed a unidirectional stream back"; 229 WebTransportStream* stream = session_->OpenOutgoingUnidirectionalStream(); 230 stream->SetVisitor( 231 std::make_unique<WebTransportUnidirectionalEchoWriteVisitor>( 232 stream, streams_to_echo_back_.front())); 233 streams_to_echo_back_.pop_front(); 234 stream->visitor()->OnCanWrite(); 235 } 236 } 237 238 private: 239 WebTransportSession* session_; 240 quiche::SimpleBufferAllocator allocator_; 241 bool echo_stream_opened_; 242 243 quiche::QuicheCircularDeque<std::string> streams_to_echo_back_; 244 }; 245 246 } // namespace quic 247 248 #endif // QUICHE_QUIC_TOOLS_WEB_TRANSPORT_TEST_VISITORS_H_ 249