1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/transport/http_connect_handshaker.h"
22
23 #include <limits.h>
24 #include <string.h>
25
26 #include <memory>
27 #include <string>
28
29 #include "absl/base/thread_annotations.h"
30 #include "absl/status/status.h"
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/types/optional.h"
34
35 #include <grpc/slice.h>
36 #include <grpc/slice_buffer.h>
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/log.h>
39
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/config/core_configuration.h"
42 #include "src/core/lib/gpr/string.h"
43 #include "src/core/lib/gprpp/debug_location.h"
44 #include "src/core/lib/gprpp/ref_counted_ptr.h"
45 #include "src/core/lib/gprpp/sync.h"
46 #include "src/core/lib/http/format_request.h"
47 #include "src/core/lib/http/parser.h"
48 #include "src/core/lib/iomgr/closure.h"
49 #include "src/core/lib/iomgr/endpoint.h"
50 #include "src/core/lib/iomgr/error.h"
51 #include "src/core/lib/iomgr/exec_ctx.h"
52 #include "src/core/lib/iomgr/iomgr_fwd.h"
53 #include "src/core/lib/iomgr/tcp_server.h"
54 #include "src/core/lib/transport/handshaker.h"
55 #include "src/core/lib/transport/handshaker_factory.h"
56 #include "src/core/lib/transport/handshaker_registry.h"
57
58 namespace grpc_core {
59
60 namespace {
61
62 class HttpConnectHandshaker : public Handshaker {
63 public:
64 HttpConnectHandshaker();
65 void Shutdown(grpc_error_handle why) override;
66 void DoHandshake(grpc_tcp_server_acceptor* acceptor,
67 grpc_closure* on_handshake_done,
68 HandshakerArgs* args) override;
name() const69 const char* name() const override { return "http_connect"; }
70
71 private:
72 ~HttpConnectHandshaker() override;
73 void CleanupArgsForFailureLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
74 void HandshakeFailedLocked(grpc_error_handle error)
75 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
76 static void OnWriteDone(void* arg, grpc_error_handle error);
77 static void OnReadDone(void* arg, grpc_error_handle error);
78 static void OnWriteDoneScheduler(void* arg, grpc_error_handle error);
79 static void OnReadDoneScheduler(void* arg, grpc_error_handle error);
80
81 Mutex mu_;
82
83 bool is_shutdown_ ABSL_GUARDED_BY(mu_) = false;
84 // Endpoint and read buffer to destroy after a shutdown.
85 grpc_endpoint* endpoint_to_destroy_ ABSL_GUARDED_BY(mu_) = nullptr;
86 grpc_slice_buffer* read_buffer_to_destroy_ ABSL_GUARDED_BY(mu_) = nullptr;
87
88 // State saved while performing the handshake.
89 HandshakerArgs* args_ = nullptr;
90 grpc_closure* on_handshake_done_ = nullptr;
91
92 // Objects for processing the HTTP CONNECT request and response.
93 grpc_slice_buffer write_buffer_ ABSL_GUARDED_BY(mu_);
94 grpc_closure request_done_closure_ ABSL_GUARDED_BY(mu_);
95 grpc_closure response_read_closure_ ABSL_GUARDED_BY(mu_);
96 grpc_http_parser http_parser_ ABSL_GUARDED_BY(mu_);
97 grpc_http_response http_response_ ABSL_GUARDED_BY(mu_);
98 };
99
~HttpConnectHandshaker()100 HttpConnectHandshaker::~HttpConnectHandshaker() {
101 if (endpoint_to_destroy_ != nullptr) {
102 grpc_endpoint_destroy(endpoint_to_destroy_);
103 }
104 if (read_buffer_to_destroy_ != nullptr) {
105 grpc_slice_buffer_destroy(read_buffer_to_destroy_);
106 gpr_free(read_buffer_to_destroy_);
107 }
108 grpc_slice_buffer_destroy(&write_buffer_);
109 grpc_http_parser_destroy(&http_parser_);
110 grpc_http_response_destroy(&http_response_);
111 }
112
113 // Set args fields to nullptr, saving the endpoint and read buffer for
114 // later destruction.
CleanupArgsForFailureLocked()115 void HttpConnectHandshaker::CleanupArgsForFailureLocked() {
116 endpoint_to_destroy_ = args_->endpoint;
117 args_->endpoint = nullptr;
118 read_buffer_to_destroy_ = args_->read_buffer;
119 args_->read_buffer = nullptr;
120 args_->args = ChannelArgs();
121 }
122
123 // If the handshake failed or we're shutting down, clean up and invoke the
124 // callback with the error.
HandshakeFailedLocked(grpc_error_handle error)125 void HttpConnectHandshaker::HandshakeFailedLocked(grpc_error_handle error) {
126 if (error.ok()) {
127 // If we were shut down after an endpoint operation succeeded but
128 // before the endpoint callback was invoked, we need to generate our
129 // own error.
130 error = GRPC_ERROR_CREATE("Handshaker shutdown");
131 }
132 if (!is_shutdown_) {
133 // TODO(ctiller): It is currently necessary to shutdown endpoints
134 // before destroying them, even if we know that there are no
135 // pending read/write callbacks. This should be fixed, at which
136 // point this can be removed.
137 grpc_endpoint_shutdown(args_->endpoint, error);
138 // Not shutting down, so the handshake failed. Clean up before
139 // invoking the callback.
140 CleanupArgsForFailureLocked();
141 // Set shutdown to true so that subsequent calls to
142 // http_connect_handshaker_shutdown() do nothing.
143 is_shutdown_ = true;
144 }
145 // Invoke callback.
146 ExecCtx::Run(DEBUG_LOCATION, on_handshake_done_, error);
147 }
148
149 // This callback can be invoked inline while already holding onto the mutex. To
150 // avoid deadlocks, schedule OnWriteDone on ExecCtx.
OnWriteDoneScheduler(void * arg,grpc_error_handle error)151 void HttpConnectHandshaker::OnWriteDoneScheduler(void* arg,
152 grpc_error_handle error) {
153 auto* handshaker = static_cast<HttpConnectHandshaker*>(arg);
154 ExecCtx::Run(DEBUG_LOCATION,
155 GRPC_CLOSURE_INIT(&handshaker->request_done_closure_,
156 &HttpConnectHandshaker::OnWriteDone,
157 handshaker, grpc_schedule_on_exec_ctx),
158 error);
159 }
160
161 // Callback invoked when finished writing HTTP CONNECT request.
OnWriteDone(void * arg,grpc_error_handle error)162 void HttpConnectHandshaker::OnWriteDone(void* arg, grpc_error_handle error) {
163 auto* handshaker = static_cast<HttpConnectHandshaker*>(arg);
164 ReleasableMutexLock lock(&handshaker->mu_);
165 if (!error.ok() || handshaker->is_shutdown_) {
166 // If the write failed or we're shutting down, clean up and invoke the
167 // callback with the error.
168 handshaker->HandshakeFailedLocked(error);
169 lock.Release();
170 handshaker->Unref();
171 } else {
172 // Otherwise, read the response.
173 // The read callback inherits our ref to the handshaker.
174 grpc_endpoint_read(
175 handshaker->args_->endpoint, handshaker->args_->read_buffer,
176 GRPC_CLOSURE_INIT(&handshaker->response_read_closure_,
177 &HttpConnectHandshaker::OnReadDoneScheduler,
178 handshaker, grpc_schedule_on_exec_ctx),
179 /*urgent=*/true, /*min_progress_size=*/1);
180 }
181 }
182
183 // This callback can be invoked inline while already holding onto the mutex. To
184 // avoid deadlocks, schedule OnReadDone on ExecCtx.
OnReadDoneScheduler(void * arg,grpc_error_handle error)185 void HttpConnectHandshaker::OnReadDoneScheduler(void* arg,
186 grpc_error_handle error) {
187 auto* handshaker = static_cast<HttpConnectHandshaker*>(arg);
188 ExecCtx::Run(DEBUG_LOCATION,
189 GRPC_CLOSURE_INIT(&handshaker->response_read_closure_,
190 &HttpConnectHandshaker::OnReadDone, handshaker,
191 grpc_schedule_on_exec_ctx),
192 error);
193 }
194
195 // Callback invoked for reading HTTP CONNECT response.
OnReadDone(void * arg,grpc_error_handle error)196 void HttpConnectHandshaker::OnReadDone(void* arg, grpc_error_handle error) {
197 auto* handshaker = static_cast<HttpConnectHandshaker*>(arg);
198 ReleasableMutexLock lock(&handshaker->mu_);
199 if (!error.ok() || handshaker->is_shutdown_) {
200 // If the read failed or we're shutting down, clean up and invoke the
201 // callback with the error.
202 handshaker->HandshakeFailedLocked(error);
203 goto done;
204 }
205 // Add buffer to parser.
206 for (size_t i = 0; i < handshaker->args_->read_buffer->count; ++i) {
207 if (GRPC_SLICE_LENGTH(handshaker->args_->read_buffer->slices[i]) > 0) {
208 size_t body_start_offset = 0;
209 error = grpc_http_parser_parse(&handshaker->http_parser_,
210 handshaker->args_->read_buffer->slices[i],
211 &body_start_offset);
212 if (!error.ok()) {
213 handshaker->HandshakeFailedLocked(error);
214 goto done;
215 }
216 if (handshaker->http_parser_.state == GRPC_HTTP_BODY) {
217 // Remove the data we've already read from the read buffer,
218 // leaving only the leftover bytes (if any).
219 grpc_slice_buffer tmp_buffer;
220 grpc_slice_buffer_init(&tmp_buffer);
221 if (body_start_offset <
222 GRPC_SLICE_LENGTH(handshaker->args_->read_buffer->slices[i])) {
223 grpc_slice_buffer_add(
224 &tmp_buffer,
225 grpc_slice_split_tail(&handshaker->args_->read_buffer->slices[i],
226 body_start_offset));
227 }
228 grpc_slice_buffer_addn(&tmp_buffer,
229 &handshaker->args_->read_buffer->slices[i + 1],
230 handshaker->args_->read_buffer->count - i - 1);
231 grpc_slice_buffer_swap(handshaker->args_->read_buffer, &tmp_buffer);
232 grpc_slice_buffer_destroy(&tmp_buffer);
233 break;
234 }
235 }
236 }
237 // If we're not done reading the response, read more data.
238 // TODO(roth): In practice, I suspect that the response to a CONNECT
239 // request will never include a body, in which case this check is
240 // sufficient. However, the language of RFC-2817 doesn't explicitly
241 // forbid the response from including a body. If there is a body,
242 // it's possible that we might have parsed part but not all of the
243 // body, in which case this check will cause us to fail to parse the
244 // remainder of the body. If that ever becomes an issue, we may
245 // need to fix the HTTP parser to understand when the body is
246 // complete (e.g., handling chunked transfer encoding or looking
247 // at the Content-Length: header).
248 if (handshaker->http_parser_.state != GRPC_HTTP_BODY) {
249 grpc_slice_buffer_reset_and_unref(handshaker->args_->read_buffer);
250 grpc_endpoint_read(
251 handshaker->args_->endpoint, handshaker->args_->read_buffer,
252 GRPC_CLOSURE_INIT(&handshaker->response_read_closure_,
253 &HttpConnectHandshaker::OnReadDoneScheduler,
254 handshaker, grpc_schedule_on_exec_ctx),
255 /*urgent=*/true, /*min_progress_size=*/1);
256 return;
257 }
258 // Make sure we got a 2xx response.
259 if (handshaker->http_response_.status < 200 ||
260 handshaker->http_response_.status >= 300) {
261 error = GRPC_ERROR_CREATE(absl::StrCat("HTTP proxy returned response code ",
262 handshaker->http_response_.status));
263 handshaker->HandshakeFailedLocked(error);
264 goto done;
265 }
266 // Success. Invoke handshake-done callback.
267 ExecCtx::Run(DEBUG_LOCATION, handshaker->on_handshake_done_, error);
268 done:
269 // Set shutdown to true so that subsequent calls to
270 // http_connect_handshaker_shutdown() do nothing.
271 handshaker->is_shutdown_ = true;
272 lock.Release();
273 handshaker->Unref();
274 }
275
276 //
277 // Public handshaker methods
278 //
279
Shutdown(grpc_error_handle why)280 void HttpConnectHandshaker::Shutdown(grpc_error_handle why) {
281 {
282 MutexLock lock(&mu_);
283 if (!is_shutdown_) {
284 is_shutdown_ = true;
285 grpc_endpoint_shutdown(args_->endpoint, why);
286 CleanupArgsForFailureLocked();
287 }
288 }
289 }
290
DoHandshake(grpc_tcp_server_acceptor *,grpc_closure * on_handshake_done,HandshakerArgs * args)291 void HttpConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
292 grpc_closure* on_handshake_done,
293 HandshakerArgs* args) {
294 // Check for HTTP CONNECT channel arg.
295 // If not found, invoke on_handshake_done without doing anything.
296 absl::optional<absl::string_view> server_name =
297 args->args.GetString(GRPC_ARG_HTTP_CONNECT_SERVER);
298 if (!server_name.has_value()) {
299 // Set shutdown to true so that subsequent calls to
300 // http_connect_handshaker_shutdown() do nothing.
301 {
302 MutexLock lock(&mu_);
303 is_shutdown_ = true;
304 }
305 ExecCtx::Run(DEBUG_LOCATION, on_handshake_done, absl::OkStatus());
306 return;
307 }
308 // Get headers from channel args.
309 absl::optional<absl::string_view> arg_header_string =
310 args->args.GetString(GRPC_ARG_HTTP_CONNECT_HEADERS);
311 grpc_http_header* headers = nullptr;
312 size_t num_headers = 0;
313 char** header_strings = nullptr;
314 size_t num_header_strings = 0;
315 if (arg_header_string.has_value()) {
316 std::string buffer(*arg_header_string);
317 gpr_string_split(buffer.c_str(), "\n", &header_strings,
318 &num_header_strings);
319 headers = static_cast<grpc_http_header*>(
320 gpr_malloc(sizeof(grpc_http_header) * num_header_strings));
321 for (size_t i = 0; i < num_header_strings; ++i) {
322 char* sep = strchr(header_strings[i], ':');
323
324 if (sep == nullptr) {
325 gpr_log(GPR_ERROR, "skipping unparseable HTTP CONNECT header: %s",
326 header_strings[i]);
327 continue;
328 }
329 *sep = '\0';
330 headers[num_headers].key = header_strings[i];
331 headers[num_headers].value = sep + 1;
332 ++num_headers;
333 }
334 }
335 // Save state in the handshaker object.
336 MutexLock lock(&mu_);
337 args_ = args;
338 on_handshake_done_ = on_handshake_done;
339 // Log connection via proxy.
340 std::string proxy_name(grpc_endpoint_get_peer(args->endpoint));
341 std::string server_name_string(*server_name);
342 gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s",
343 server_name_string.c_str(), proxy_name.c_str());
344 // Construct HTTP CONNECT request.
345 grpc_http_request request;
346 request.method = const_cast<char*>("CONNECT");
347 request.version = GRPC_HTTP_HTTP10; // Set by OnReadDone
348 request.hdrs = headers;
349 request.hdr_count = num_headers;
350 request.body_length = 0;
351 request.body = nullptr;
352 grpc_slice request_slice = grpc_httpcli_format_connect_request(
353 &request, server_name_string.c_str(), server_name_string.c_str());
354 grpc_slice_buffer_add(&write_buffer_, request_slice);
355 // Clean up.
356 gpr_free(headers);
357 for (size_t i = 0; i < num_header_strings; ++i) {
358 gpr_free(header_strings[i]);
359 }
360 gpr_free(header_strings);
361 // Take a new ref to be held by the write callback.
362 Ref().release();
363 grpc_endpoint_write(
364 args->endpoint, &write_buffer_,
365 GRPC_CLOSURE_INIT(&request_done_closure_,
366 &HttpConnectHandshaker::OnWriteDoneScheduler, this,
367 grpc_schedule_on_exec_ctx),
368 nullptr, /*max_frame_size=*/INT_MAX);
369 }
370
HttpConnectHandshaker()371 HttpConnectHandshaker::HttpConnectHandshaker() {
372 grpc_slice_buffer_init(&write_buffer_);
373 grpc_http_parser_init(&http_parser_, GRPC_HTTP_RESPONSE, &http_response_);
374 }
375
376 //
377 // handshaker factory
378 //
379
380 class HttpConnectHandshakerFactory : public HandshakerFactory {
381 public:
AddHandshakers(const ChannelArgs &,grpc_pollset_set *,HandshakeManager * handshake_mgr)382 void AddHandshakers(const ChannelArgs& /*args*/,
383 grpc_pollset_set* /*interested_parties*/,
384 HandshakeManager* handshake_mgr) override {
385 handshake_mgr->Add(MakeRefCounted<HttpConnectHandshaker>());
386 }
Priority()387 HandshakerPriority Priority() override {
388 return HandshakerPriority::kHTTPConnectHandshakers;
389 }
390 ~HttpConnectHandshakerFactory() override = default;
391 };
392
393 } // namespace
394
RegisterHttpConnectHandshaker(CoreConfiguration::Builder * builder)395 void RegisterHttpConnectHandshaker(CoreConfiguration::Builder* builder) {
396 builder->handshaker_registry()->RegisterHandshakerFactory(
397 HANDSHAKER_CLIENT, std::make_unique<HttpConnectHandshakerFactory>());
398 }
399
400 } // namespace grpc_core
401