1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/transport/http_connect_handshaker.h"
22 
23 #include <limits.h>
24 #include <string.h>
25 
26 #include <memory>
27 #include <string>
28 
29 #include "absl/base/thread_annotations.h"
30 #include "absl/status/status.h"
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/types/optional.h"
34 
35 #include <grpc/slice.h>
36 #include <grpc/slice_buffer.h>
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/log.h>
39 
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/config/core_configuration.h"
42 #include "src/core/lib/gpr/string.h"
43 #include "src/core/lib/gprpp/debug_location.h"
44 #include "src/core/lib/gprpp/ref_counted_ptr.h"
45 #include "src/core/lib/gprpp/sync.h"
46 #include "src/core/lib/http/format_request.h"
47 #include "src/core/lib/http/parser.h"
48 #include "src/core/lib/iomgr/closure.h"
49 #include "src/core/lib/iomgr/endpoint.h"
50 #include "src/core/lib/iomgr/error.h"
51 #include "src/core/lib/iomgr/exec_ctx.h"
52 #include "src/core/lib/iomgr/iomgr_fwd.h"
53 #include "src/core/lib/iomgr/tcp_server.h"
54 #include "src/core/lib/transport/handshaker.h"
55 #include "src/core/lib/transport/handshaker_factory.h"
56 #include "src/core/lib/transport/handshaker_registry.h"
57 
58 namespace grpc_core {
59 
60 namespace {
61 
62 class HttpConnectHandshaker : public Handshaker {
63  public:
64   HttpConnectHandshaker();
65   void Shutdown(grpc_error_handle why) override;
66   void DoHandshake(grpc_tcp_server_acceptor* acceptor,
67                    grpc_closure* on_handshake_done,
68                    HandshakerArgs* args) override;
name() const69   const char* name() const override { return "http_connect"; }
70 
71  private:
72   ~HttpConnectHandshaker() override;
73   void CleanupArgsForFailureLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
74   void HandshakeFailedLocked(grpc_error_handle error)
75       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
76   static void OnWriteDone(void* arg, grpc_error_handle error);
77   static void OnReadDone(void* arg, grpc_error_handle error);
78   static void OnWriteDoneScheduler(void* arg, grpc_error_handle error);
79   static void OnReadDoneScheduler(void* arg, grpc_error_handle error);
80 
81   Mutex mu_;
82 
83   bool is_shutdown_ ABSL_GUARDED_BY(mu_) = false;
84   // Endpoint and read buffer to destroy after a shutdown.
85   grpc_endpoint* endpoint_to_destroy_ ABSL_GUARDED_BY(mu_) = nullptr;
86   grpc_slice_buffer* read_buffer_to_destroy_ ABSL_GUARDED_BY(mu_) = nullptr;
87 
88   // State saved while performing the handshake.
89   HandshakerArgs* args_ = nullptr;
90   grpc_closure* on_handshake_done_ = nullptr;
91 
92   // Objects for processing the HTTP CONNECT request and response.
93   grpc_slice_buffer write_buffer_ ABSL_GUARDED_BY(mu_);
94   grpc_closure request_done_closure_ ABSL_GUARDED_BY(mu_);
95   grpc_closure response_read_closure_ ABSL_GUARDED_BY(mu_);
96   grpc_http_parser http_parser_ ABSL_GUARDED_BY(mu_);
97   grpc_http_response http_response_ ABSL_GUARDED_BY(mu_);
98 };
99 
~HttpConnectHandshaker()100 HttpConnectHandshaker::~HttpConnectHandshaker() {
101   if (endpoint_to_destroy_ != nullptr) {
102     grpc_endpoint_destroy(endpoint_to_destroy_);
103   }
104   if (read_buffer_to_destroy_ != nullptr) {
105     grpc_slice_buffer_destroy(read_buffer_to_destroy_);
106     gpr_free(read_buffer_to_destroy_);
107   }
108   grpc_slice_buffer_destroy(&write_buffer_);
109   grpc_http_parser_destroy(&http_parser_);
110   grpc_http_response_destroy(&http_response_);
111 }
112 
113 // Set args fields to nullptr, saving the endpoint and read buffer for
114 // later destruction.
CleanupArgsForFailureLocked()115 void HttpConnectHandshaker::CleanupArgsForFailureLocked() {
116   endpoint_to_destroy_ = args_->endpoint;
117   args_->endpoint = nullptr;
118   read_buffer_to_destroy_ = args_->read_buffer;
119   args_->read_buffer = nullptr;
120   args_->args = ChannelArgs();
121 }
122 
123 // If the handshake failed or we're shutting down, clean up and invoke the
124 // callback with the error.
HandshakeFailedLocked(grpc_error_handle error)125 void HttpConnectHandshaker::HandshakeFailedLocked(grpc_error_handle error) {
126   if (error.ok()) {
127     // If we were shut down after an endpoint operation succeeded but
128     // before the endpoint callback was invoked, we need to generate our
129     // own error.
130     error = GRPC_ERROR_CREATE("Handshaker shutdown");
131   }
132   if (!is_shutdown_) {
133     // TODO(ctiller): It is currently necessary to shutdown endpoints
134     // before destroying them, even if we know that there are no
135     // pending read/write callbacks.  This should be fixed, at which
136     // point this can be removed.
137     grpc_endpoint_shutdown(args_->endpoint, error);
138     // Not shutting down, so the handshake failed.  Clean up before
139     // invoking the callback.
140     CleanupArgsForFailureLocked();
141     // Set shutdown to true so that subsequent calls to
142     // http_connect_handshaker_shutdown() do nothing.
143     is_shutdown_ = true;
144   }
145   // Invoke callback.
146   ExecCtx::Run(DEBUG_LOCATION, on_handshake_done_, error);
147 }
148 
149 // This callback can be invoked inline while already holding onto the mutex. To
150 // avoid deadlocks, schedule OnWriteDone on ExecCtx.
OnWriteDoneScheduler(void * arg,grpc_error_handle error)151 void HttpConnectHandshaker::OnWriteDoneScheduler(void* arg,
152                                                  grpc_error_handle error) {
153   auto* handshaker = static_cast<HttpConnectHandshaker*>(arg);
154   ExecCtx::Run(DEBUG_LOCATION,
155                GRPC_CLOSURE_INIT(&handshaker->request_done_closure_,
156                                  &HttpConnectHandshaker::OnWriteDone,
157                                  handshaker, grpc_schedule_on_exec_ctx),
158                error);
159 }
160 
161 // Callback invoked when finished writing HTTP CONNECT request.
OnWriteDone(void * arg,grpc_error_handle error)162 void HttpConnectHandshaker::OnWriteDone(void* arg, grpc_error_handle error) {
163   auto* handshaker = static_cast<HttpConnectHandshaker*>(arg);
164   ReleasableMutexLock lock(&handshaker->mu_);
165   if (!error.ok() || handshaker->is_shutdown_) {
166     // If the write failed or we're shutting down, clean up and invoke the
167     // callback with the error.
168     handshaker->HandshakeFailedLocked(error);
169     lock.Release();
170     handshaker->Unref();
171   } else {
172     // Otherwise, read the response.
173     // The read callback inherits our ref to the handshaker.
174     grpc_endpoint_read(
175         handshaker->args_->endpoint, handshaker->args_->read_buffer,
176         GRPC_CLOSURE_INIT(&handshaker->response_read_closure_,
177                           &HttpConnectHandshaker::OnReadDoneScheduler,
178                           handshaker, grpc_schedule_on_exec_ctx),
179         /*urgent=*/true, /*min_progress_size=*/1);
180   }
181 }
182 
183 // This callback can be invoked inline while already holding onto the mutex. To
184 // avoid deadlocks, schedule OnReadDone on ExecCtx.
OnReadDoneScheduler(void * arg,grpc_error_handle error)185 void HttpConnectHandshaker::OnReadDoneScheduler(void* arg,
186                                                 grpc_error_handle error) {
187   auto* handshaker = static_cast<HttpConnectHandshaker*>(arg);
188   ExecCtx::Run(DEBUG_LOCATION,
189                GRPC_CLOSURE_INIT(&handshaker->response_read_closure_,
190                                  &HttpConnectHandshaker::OnReadDone, handshaker,
191                                  grpc_schedule_on_exec_ctx),
192                error);
193 }
194 
195 // Callback invoked for reading HTTP CONNECT response.
OnReadDone(void * arg,grpc_error_handle error)196 void HttpConnectHandshaker::OnReadDone(void* arg, grpc_error_handle error) {
197   auto* handshaker = static_cast<HttpConnectHandshaker*>(arg);
198   ReleasableMutexLock lock(&handshaker->mu_);
199   if (!error.ok() || handshaker->is_shutdown_) {
200     // If the read failed or we're shutting down, clean up and invoke the
201     // callback with the error.
202     handshaker->HandshakeFailedLocked(error);
203     goto done;
204   }
205   // Add buffer to parser.
206   for (size_t i = 0; i < handshaker->args_->read_buffer->count; ++i) {
207     if (GRPC_SLICE_LENGTH(handshaker->args_->read_buffer->slices[i]) > 0) {
208       size_t body_start_offset = 0;
209       error = grpc_http_parser_parse(&handshaker->http_parser_,
210                                      handshaker->args_->read_buffer->slices[i],
211                                      &body_start_offset);
212       if (!error.ok()) {
213         handshaker->HandshakeFailedLocked(error);
214         goto done;
215       }
216       if (handshaker->http_parser_.state == GRPC_HTTP_BODY) {
217         // Remove the data we've already read from the read buffer,
218         // leaving only the leftover bytes (if any).
219         grpc_slice_buffer tmp_buffer;
220         grpc_slice_buffer_init(&tmp_buffer);
221         if (body_start_offset <
222             GRPC_SLICE_LENGTH(handshaker->args_->read_buffer->slices[i])) {
223           grpc_slice_buffer_add(
224               &tmp_buffer,
225               grpc_slice_split_tail(&handshaker->args_->read_buffer->slices[i],
226                                     body_start_offset));
227         }
228         grpc_slice_buffer_addn(&tmp_buffer,
229                                &handshaker->args_->read_buffer->slices[i + 1],
230                                handshaker->args_->read_buffer->count - i - 1);
231         grpc_slice_buffer_swap(handshaker->args_->read_buffer, &tmp_buffer);
232         grpc_slice_buffer_destroy(&tmp_buffer);
233         break;
234       }
235     }
236   }
237   // If we're not done reading the response, read more data.
238   // TODO(roth): In practice, I suspect that the response to a CONNECT
239   // request will never include a body, in which case this check is
240   // sufficient.  However, the language of RFC-2817 doesn't explicitly
241   // forbid the response from including a body.  If there is a body,
242   // it's possible that we might have parsed part but not all of the
243   // body, in which case this check will cause us to fail to parse the
244   // remainder of the body.  If that ever becomes an issue, we may
245   // need to fix the HTTP parser to understand when the body is
246   // complete (e.g., handling chunked transfer encoding or looking
247   // at the Content-Length: header).
248   if (handshaker->http_parser_.state != GRPC_HTTP_BODY) {
249     grpc_slice_buffer_reset_and_unref(handshaker->args_->read_buffer);
250     grpc_endpoint_read(
251         handshaker->args_->endpoint, handshaker->args_->read_buffer,
252         GRPC_CLOSURE_INIT(&handshaker->response_read_closure_,
253                           &HttpConnectHandshaker::OnReadDoneScheduler,
254                           handshaker, grpc_schedule_on_exec_ctx),
255         /*urgent=*/true, /*min_progress_size=*/1);
256     return;
257   }
258   // Make sure we got a 2xx response.
259   if (handshaker->http_response_.status < 200 ||
260       handshaker->http_response_.status >= 300) {
261     error = GRPC_ERROR_CREATE(absl::StrCat("HTTP proxy returned response code ",
262                                            handshaker->http_response_.status));
263     handshaker->HandshakeFailedLocked(error);
264     goto done;
265   }
266   // Success.  Invoke handshake-done callback.
267   ExecCtx::Run(DEBUG_LOCATION, handshaker->on_handshake_done_, error);
268 done:
269   // Set shutdown to true so that subsequent calls to
270   // http_connect_handshaker_shutdown() do nothing.
271   handshaker->is_shutdown_ = true;
272   lock.Release();
273   handshaker->Unref();
274 }
275 
276 //
277 // Public handshaker methods
278 //
279 
Shutdown(grpc_error_handle why)280 void HttpConnectHandshaker::Shutdown(grpc_error_handle why) {
281   {
282     MutexLock lock(&mu_);
283     if (!is_shutdown_) {
284       is_shutdown_ = true;
285       grpc_endpoint_shutdown(args_->endpoint, why);
286       CleanupArgsForFailureLocked();
287     }
288   }
289 }
290 
DoHandshake(grpc_tcp_server_acceptor *,grpc_closure * on_handshake_done,HandshakerArgs * args)291 void HttpConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
292                                         grpc_closure* on_handshake_done,
293                                         HandshakerArgs* args) {
294   // Check for HTTP CONNECT channel arg.
295   // If not found, invoke on_handshake_done without doing anything.
296   absl::optional<absl::string_view> server_name =
297       args->args.GetString(GRPC_ARG_HTTP_CONNECT_SERVER);
298   if (!server_name.has_value()) {
299     // Set shutdown to true so that subsequent calls to
300     // http_connect_handshaker_shutdown() do nothing.
301     {
302       MutexLock lock(&mu_);
303       is_shutdown_ = true;
304     }
305     ExecCtx::Run(DEBUG_LOCATION, on_handshake_done, absl::OkStatus());
306     return;
307   }
308   // Get headers from channel args.
309   absl::optional<absl::string_view> arg_header_string =
310       args->args.GetString(GRPC_ARG_HTTP_CONNECT_HEADERS);
311   grpc_http_header* headers = nullptr;
312   size_t num_headers = 0;
313   char** header_strings = nullptr;
314   size_t num_header_strings = 0;
315   if (arg_header_string.has_value()) {
316     std::string buffer(*arg_header_string);
317     gpr_string_split(buffer.c_str(), "\n", &header_strings,
318                      &num_header_strings);
319     headers = static_cast<grpc_http_header*>(
320         gpr_malloc(sizeof(grpc_http_header) * num_header_strings));
321     for (size_t i = 0; i < num_header_strings; ++i) {
322       char* sep = strchr(header_strings[i], ':');
323 
324       if (sep == nullptr) {
325         gpr_log(GPR_ERROR, "skipping unparseable HTTP CONNECT header: %s",
326                 header_strings[i]);
327         continue;
328       }
329       *sep = '\0';
330       headers[num_headers].key = header_strings[i];
331       headers[num_headers].value = sep + 1;
332       ++num_headers;
333     }
334   }
335   // Save state in the handshaker object.
336   MutexLock lock(&mu_);
337   args_ = args;
338   on_handshake_done_ = on_handshake_done;
339   // Log connection via proxy.
340   std::string proxy_name(grpc_endpoint_get_peer(args->endpoint));
341   std::string server_name_string(*server_name);
342   gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s",
343           server_name_string.c_str(), proxy_name.c_str());
344   // Construct HTTP CONNECT request.
345   grpc_http_request request;
346   request.method = const_cast<char*>("CONNECT");
347   request.version = GRPC_HTTP_HTTP10;  // Set by OnReadDone
348   request.hdrs = headers;
349   request.hdr_count = num_headers;
350   request.body_length = 0;
351   request.body = nullptr;
352   grpc_slice request_slice = grpc_httpcli_format_connect_request(
353       &request, server_name_string.c_str(), server_name_string.c_str());
354   grpc_slice_buffer_add(&write_buffer_, request_slice);
355   // Clean up.
356   gpr_free(headers);
357   for (size_t i = 0; i < num_header_strings; ++i) {
358     gpr_free(header_strings[i]);
359   }
360   gpr_free(header_strings);
361   // Take a new ref to be held by the write callback.
362   Ref().release();
363   grpc_endpoint_write(
364       args->endpoint, &write_buffer_,
365       GRPC_CLOSURE_INIT(&request_done_closure_,
366                         &HttpConnectHandshaker::OnWriteDoneScheduler, this,
367                         grpc_schedule_on_exec_ctx),
368       nullptr, /*max_frame_size=*/INT_MAX);
369 }
370 
HttpConnectHandshaker()371 HttpConnectHandshaker::HttpConnectHandshaker() {
372   grpc_slice_buffer_init(&write_buffer_);
373   grpc_http_parser_init(&http_parser_, GRPC_HTTP_RESPONSE, &http_response_);
374 }
375 
376 //
377 // handshaker factory
378 //
379 
380 class HttpConnectHandshakerFactory : public HandshakerFactory {
381  public:
AddHandshakers(const ChannelArgs &,grpc_pollset_set *,HandshakeManager * handshake_mgr)382   void AddHandshakers(const ChannelArgs& /*args*/,
383                       grpc_pollset_set* /*interested_parties*/,
384                       HandshakeManager* handshake_mgr) override {
385     handshake_mgr->Add(MakeRefCounted<HttpConnectHandshaker>());
386   }
Priority()387   HandshakerPriority Priority() override {
388     return HandshakerPriority::kHTTPConnectHandshakers;
389   }
390   ~HttpConnectHandshakerFactory() override = default;
391 };
392 
393 }  // namespace
394 
RegisterHttpConnectHandshaker(CoreConfiguration::Builder * builder)395 void RegisterHttpConnectHandshaker(CoreConfiguration::Builder* builder) {
396   builder->handshaker_registry()->RegisterHandshakerFactory(
397       HANDSHAKER_CLIENT, std::make_unique<HttpConnectHandshakerFactory>());
398 }
399 
400 }  // namespace grpc_core
401