1<?php 2/* 3 * 4 * Copyright 2015 gRPC authors. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 */ 19 20namespace Grpc; 21 22/** 23 * Base class for generated client stubs. Stub methods are expected to call 24 * _simpleRequest or _streamRequest and return the result. 25 */ 26class BaseStub 27{ 28 private $hostname; 29 private $hostname_override; 30 private $channel; 31 private $call_invoker; 32 33 // a callback function 34 private $update_metadata; 35 36 /** 37 * @param string $hostname 38 * @param array $opts 39 * - 'update_metadata': (optional) a callback function which takes in a 40 * metadata array, and returns an updated metadata array 41 * - 'grpc.primary_user_agent': (optional) a user-agent string 42 * @param Channel|InterceptorChannel $channel An already created Channel or InterceptorChannel object (optional) 43 */ 44 public function __construct($hostname, $opts, $channel = null) 45 { 46 if (!method_exists('Grpc\ChannelCredentials', 'isDefaultRootsPemSet') || 47 !ChannelCredentials::isDefaultRootsPemSet()) { 48 $ssl_roots = file_get_contents( 49 dirname(__FILE__).'/../../../../etc/roots.pem' 50 ); 51 ChannelCredentials::setDefaultRootsPem($ssl_roots); 52 } 53 54 $this->hostname = $hostname; 55 $this->update_metadata = null; 56 if (isset($opts['update_metadata'])) { 57 if (is_callable($opts['update_metadata'])) { 58 $this->update_metadata = $opts['update_metadata']; 59 } 60 unset($opts['update_metadata']); 61 } 62 if (!empty($opts['grpc.ssl_target_name_override'])) { 63 $this->hostname_override = $opts['grpc.ssl_target_name_override']; 64 } 65 if (isset($opts['grpc_call_invoker'])) { 66 $this->call_invoker = $opts['grpc_call_invoker']; 67 unset($opts['grpc_call_invoker']); 68 $channel_opts = $this->updateOpts($opts); 69 // If the grpc_call_invoker is defined, use the channel created by the call invoker. 70 $this->channel = $this->call_invoker->createChannelFactory($hostname, $channel_opts); 71 return; 72 } 73 $this->call_invoker = new DefaultCallInvoker(); 74 if ($channel) { 75 if (!is_a($channel, 'Grpc\Channel') && 76 !is_a($channel, 'Grpc\Internal\InterceptorChannel')) { 77 throw new \Exception('The channel argument is not a Channel object '. 78 'or an InterceptorChannel object created by '. 79 'Interceptor::intercept($channel, Interceptor|Interceptor[] $interceptors)'); 80 } 81 $this->channel = $channel; 82 return; 83 } 84 85 $this->channel = static::getDefaultChannel($hostname, $opts); 86 } 87 88 private static function updateOpts($opts) { 89 if (!empty($opts['grpc.primary_user_agent'])) { 90 $opts['grpc.primary_user_agent'] .= ' '; 91 } else { 92 $opts['grpc.primary_user_agent'] = ''; 93 } 94 if (defined('\Grpc\VERSION')) { 95 $version_str = \Grpc\VERSION; 96 } else { 97 if (!file_exists($composerFile = __DIR__.'/../../composer.json')) { 98 // for grpc/grpc-php subpackage 99 $composerFile = __DIR__.'/../composer.json'; 100 } 101 $package_config = json_decode(file_get_contents($composerFile), true); 102 $version_str = $package_config['version']; 103 } 104 $opts['grpc.primary_user_agent'] .= 'grpc-php/'.$version_str; 105 if (!array_key_exists('credentials', $opts)) { 106 throw new \Exception("The opts['credentials'] key is now ". 107 'required. Please see one of the '. 108 'ChannelCredentials::create methods'); 109 } 110 return $opts; 111 } 112 113 /** 114 * Creates and returns the default Channel 115 * 116 * @param array $opts Channel constructor options 117 * 118 * @return Channel The channel 119 */ 120 public static function getDefaultChannel($hostname, array $opts) 121 { 122 $channel_opts = self::updateOpts($opts); 123 return new Channel($hostname, $channel_opts); 124 } 125 126 /** 127 * @return string The URI of the endpoint 128 */ 129 public function getTarget() 130 { 131 return $this->channel->getTarget(); 132 } 133 134 /** 135 * @param bool $try_to_connect (optional) 136 * 137 * @return int The grpc connectivity state 138 */ 139 public function getConnectivityState($try_to_connect = false) 140 { 141 return $this->channel->getConnectivityState($try_to_connect); 142 } 143 144 /** 145 * @param int $timeout in microseconds 146 * 147 * @return bool true if channel is ready 148 * @throws Exception if channel is in FATAL_ERROR state 149 */ 150 public function waitForReady($timeout) 151 { 152 $new_state = $this->getConnectivityState(true); 153 if ($this->_checkConnectivityState($new_state)) { 154 return true; 155 } 156 157 $now = Timeval::now(); 158 $delta = new Timeval($timeout); 159 $deadline = $now->add($delta); 160 161 while ($this->channel->watchConnectivityState($new_state, $deadline)) { 162 // state has changed before deadline 163 $new_state = $this->getConnectivityState(); 164 if ($this->_checkConnectivityState($new_state)) { 165 return true; 166 } 167 } 168 // deadline has passed 169 $new_state = $this->getConnectivityState(); 170 171 return $this->_checkConnectivityState($new_state); 172 } 173 174 /** 175 * Close the communication channel associated with this stub. 176 */ 177 public function close() 178 { 179 $this->channel->close(); 180 } 181 182 /** 183 * @param $new_state Connect state 184 * 185 * @return bool true if state is CHANNEL_READY 186 * @throws Exception if state is CHANNEL_FATAL_FAILURE 187 */ 188 private function _checkConnectivityState($new_state) 189 { 190 if ($new_state == \Grpc\CHANNEL_READY) { 191 return true; 192 } 193 if ($new_state == \Grpc\CHANNEL_FATAL_FAILURE) { 194 throw new \Exception('Failed to connect to server'); 195 } 196 197 return false; 198 } 199 200 /** 201 * constructs the auth uri for the jwt. 202 * 203 * @param string $method The method string 204 * 205 * @return string The URL string 206 */ 207 private function _get_jwt_aud_uri($method) 208 { 209 // TODO(jtattermusch): This is not the correct implementation 210 // of extracting JWT "aud" claim. We should rely on 211 // grpc_metadata_credentials_plugin which 212 // also provides the correct value of "aud" claim 213 // in the grpc_auth_metadata_context.service_url field. 214 // Trying to do the construction of "aud" field ourselves 215 // is bad. 216 $last_slash_idx = strrpos($method, '/'); 217 if ($last_slash_idx === false) { 218 throw new \InvalidArgumentException( 219 'service name must have a slash' 220 ); 221 } 222 $service_name = substr($method, 0, $last_slash_idx); 223 224 if ($this->hostname_override) { 225 $hostname = $this->hostname_override; 226 } else { 227 $hostname = $this->hostname; 228 } 229 230 // Remove the port if it is 443 231 // See https://github.com/grpc/grpc/blob/07c9f7a36b2a0d34fcffebc85649cf3b8c339b5d/src/core/lib/security/transport/client_auth_filter.cc#L205 232 if ((strlen($hostname) > 4) && (substr($hostname, -4) === ":443")) { 233 $hostname = substr($hostname, 0, -4); 234 } 235 236 return 'https://'.$hostname.$service_name; 237 } 238 239 /** 240 * validate and normalize the metadata array. 241 * 242 * @param array $metadata The metadata map 243 * 244 * @return array $metadata Validated and key-normalized metadata map 245 * @throws InvalidArgumentException if key contains invalid characters 246 */ 247 private function _validate_and_normalize_metadata($metadata) 248 { 249 $metadata_copy = []; 250 foreach ($metadata as $key => $value) { 251 if (!preg_match('/^[.A-Za-z\d_-]+$/', $key)) { 252 throw new \InvalidArgumentException( 253 'Metadata keys must be nonempty strings containing only '. 254 'alphanumeric characters, hyphens, underscores and dots' 255 ); 256 } 257 $metadata_copy[strtolower($key)] = $value; 258 } 259 260 return $metadata_copy; 261 } 262 263 /** 264 * Create a function which can be used to create UnaryCall 265 * 266 * @param Channel|InterceptorChannel $channel 267 * @param callable $deserialize A function that deserializes the response 268 * 269 * @return \Closure 270 */ 271 private function _GrpcUnaryUnary($channel) 272 { 273 return function ($method, 274 $argument, 275 $deserialize, 276 array $metadata = [], 277 array $options = []) use ($channel) { 278 $call = $this->call_invoker->UnaryCall( 279 $channel, 280 $method, 281 $deserialize, 282 $options 283 ); 284 $jwt_aud_uri = $this->_get_jwt_aud_uri($method); 285 if (is_callable($this->update_metadata)) { 286 $metadata = call_user_func( 287 $this->update_metadata, 288 $metadata, 289 $jwt_aud_uri 290 ); 291 } 292 $metadata = $this->_validate_and_normalize_metadata( 293 $metadata 294 ); 295 $call->start($argument, $metadata, $options); 296 return $call; 297 }; 298 } 299 300 /** 301 * Create a function which can be used to create ServerStreamingCall 302 * 303 * @param Channel|InterceptorChannel $channel 304 * @param callable $deserialize A function that deserializes the response 305 * 306 * @return \Closure 307 */ 308 private function _GrpcStreamUnary($channel) 309 { 310 return function ($method, 311 $deserialize, 312 array $metadata = [], 313 array $options = []) use ($channel) { 314 $call = $this->call_invoker->ClientStreamingCall( 315 $channel, 316 $method, 317 $deserialize, 318 $options 319 ); 320 $jwt_aud_uri = $this->_get_jwt_aud_uri($method); 321 if (is_callable($this->update_metadata)) { 322 $metadata = call_user_func( 323 $this->update_metadata, 324 $metadata, 325 $jwt_aud_uri 326 ); 327 } 328 $metadata = $this->_validate_and_normalize_metadata( 329 $metadata 330 ); 331 $call->start($metadata); 332 return $call; 333 }; 334 } 335 336 /** 337 * Create a function which can be used to create ClientStreamingCall 338 * 339 * @param Channel|InterceptorChannel $channel 340 * @param callable $deserialize A function that deserializes the response 341 * 342 * @return \Closure 343 */ 344 private function _GrpcUnaryStream($channel) 345 { 346 return function ($method, 347 $argument, 348 $deserialize, 349 array $metadata = [], 350 array $options = []) use ($channel) { 351 $call = $this->call_invoker->ServerStreamingCall( 352 $channel, 353 $method, 354 $deserialize, 355 $options 356 ); 357 $jwt_aud_uri = $this->_get_jwt_aud_uri($method); 358 if (is_callable($this->update_metadata)) { 359 $metadata = call_user_func( 360 $this->update_metadata, 361 $metadata, 362 $jwt_aud_uri 363 ); 364 } 365 $metadata = $this->_validate_and_normalize_metadata( 366 $metadata 367 ); 368 $call->start($argument, $metadata, $options); 369 return $call; 370 }; 371 } 372 373 /** 374 * Create a function which can be used to create BidiStreamingCall 375 * 376 * @param Channel|InterceptorChannel $channel 377 * @param callable $deserialize A function that deserializes the response 378 * 379 * @return \Closure 380 */ 381 private function _GrpcStreamStream($channel) 382 { 383 return function ($method, 384 $deserialize, 385 array $metadata = [], 386 array $options = []) use ($channel) { 387 $call = $this->call_invoker->BidiStreamingCall( 388 $channel, 389 $method, 390 $deserialize, 391 $options 392 ); 393 $jwt_aud_uri = $this->_get_jwt_aud_uri($method); 394 if (is_callable($this->update_metadata)) { 395 $metadata = call_user_func( 396 $this->update_metadata, 397 $metadata, 398 $jwt_aud_uri 399 ); 400 } 401 $metadata = $this->_validate_and_normalize_metadata( 402 $metadata 403 ); 404 $call->start($metadata); 405 406 return $call; 407 }; 408 } 409 410 /** 411 * Create a function which can be used to create UnaryCall 412 * 413 * @param Channel|InterceptorChannel $channel 414 * @param callable $deserialize A function that deserializes the response 415 * 416 * @return \Closure 417 */ 418 private function _UnaryUnaryCallFactory($channel) 419 { 420 if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) { 421 return function ($method, 422 $argument, 423 $deserialize, 424 array $metadata = [], 425 array $options = []) use ($channel) { 426 return $channel->getInterceptor()->interceptUnaryUnary( 427 $method, 428 $argument, 429 $deserialize, 430 $this->_UnaryUnaryCallFactory($channel->getNext()), 431 $metadata, 432 $options 433 ); 434 }; 435 } 436 return $this->_GrpcUnaryUnary($channel); 437 } 438 439 /** 440 * Create a function which can be used to create ServerStreamingCall 441 * 442 * @param Channel|InterceptorChannel $channel 443 * @param callable $deserialize A function that deserializes the response 444 * 445 * @return \Closure 446 */ 447 private function _UnaryStreamCallFactory($channel) 448 { 449 if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) { 450 return function ($method, 451 $argument, 452 $deserialize, 453 array $metadata = [], 454 array $options = []) use ($channel) { 455 return $channel->getInterceptor()->interceptUnaryStream( 456 $method, 457 $argument, 458 $deserialize, 459 $this->_UnaryStreamCallFactory($channel->getNext()), 460 $metadata, 461 $options 462 ); 463 }; 464 } 465 return $this->_GrpcUnaryStream($channel); 466 } 467 468 /** 469 * Create a function which can be used to create ClientStreamingCall 470 * 471 * @param Channel|InterceptorChannel $channel 472 * @param callable $deserialize A function that deserializes the response 473 * 474 * @return \Closure 475 */ 476 private function _StreamUnaryCallFactory($channel) 477 { 478 if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) { 479 return function ($method, 480 $deserialize, 481 array $metadata = [], 482 array $options = []) use ($channel) { 483 return $channel->getInterceptor()->interceptStreamUnary( 484 $method, 485 $deserialize, 486 $this->_StreamUnaryCallFactory($channel->getNext()), 487 $metadata, 488 $options 489 ); 490 }; 491 } 492 return $this->_GrpcStreamUnary($channel); 493 } 494 495 /** 496 * Create a function which can be used to create BidiStreamingCall 497 * 498 * @param Channel|InterceptorChannel $channel 499 * @param callable $deserialize A function that deserializes the response 500 * 501 * @return \Closure 502 */ 503 private function _StreamStreamCallFactory($channel) 504 { 505 if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) { 506 return function ($method, 507 $deserialize, 508 array $metadata = [], 509 array $options = []) use ($channel) { 510 return $channel->getInterceptor()->interceptStreamStream( 511 $method, 512 $deserialize, 513 $this->_StreamStreamCallFactory($channel->getNext()), 514 $metadata, 515 $options 516 ); 517 }; 518 } 519 return $this->_GrpcStreamStream($channel); 520 } 521 522 /* This class is intended to be subclassed by generated code, so 523 * all functions begin with "_" to avoid name collisions. */ 524 /** 525 * Call a remote method that takes a single argument and has a 526 * single output. 527 * 528 * @param string $method The name of the method to call 529 * @param mixed $argument The argument to the method 530 * @param callable $deserialize A function that deserializes the response 531 * @param array $metadata A metadata map to send to the server 532 * (optional) 533 * @param array $options An array of options (optional) 534 * 535 * @return UnaryCall The active call object 536 */ 537 protected function _simpleRequest( 538 $method, 539 $argument, 540 $deserialize, 541 array $metadata = [], 542 array $options = [] 543 ) { 544 $call_factory = $this->_UnaryUnaryCallFactory($this->channel); 545 $call = $call_factory($method, $argument, $deserialize, $metadata, $options); 546 return $call; 547 } 548 549 /** 550 * Call a remote method that takes a stream of arguments and has a single 551 * output. 552 * 553 * @param string $method The name of the method to call 554 * @param callable $deserialize A function that deserializes the response 555 * @param array $metadata A metadata map to send to the server 556 * (optional) 557 * @param array $options An array of options (optional) 558 * 559 * @return ClientStreamingCall The active call object 560 */ 561 protected function _clientStreamRequest( 562 $method, 563 $deserialize, 564 array $metadata = [], 565 array $options = [] 566 ) { 567 $call_factory = $this->_StreamUnaryCallFactory($this->channel); 568 $call = $call_factory($method, $deserialize, $metadata, $options); 569 return $call; 570 } 571 572 /** 573 * Call a remote method that takes a single argument and returns a stream 574 * of responses. 575 * 576 * @param string $method The name of the method to call 577 * @param mixed $argument The argument to the method 578 * @param callable $deserialize A function that deserializes the responses 579 * @param array $metadata A metadata map to send to the server 580 * (optional) 581 * @param array $options An array of options (optional) 582 * 583 * @return ServerStreamingCall The active call object 584 */ 585 protected function _serverStreamRequest( 586 $method, 587 $argument, 588 $deserialize, 589 array $metadata = [], 590 array $options = [] 591 ) { 592 $call_factory = $this->_UnaryStreamCallFactory($this->channel); 593 $call = $call_factory($method, $argument, $deserialize, $metadata, $options); 594 return $call; 595 } 596 597 /** 598 * Call a remote method with messages streaming in both directions. 599 * 600 * @param string $method The name of the method to call 601 * @param callable $deserialize A function that deserializes the responses 602 * @param array $metadata A metadata map to send to the server 603 * (optional) 604 * @param array $options An array of options (optional) 605 * 606 * @return BidiStreamingCall The active call object 607 */ 608 protected function _bidiRequest( 609 $method, 610 $deserialize, 611 array $metadata = [], 612 array $options = [] 613 ) { 614 $call_factory = $this->_StreamStreamCallFactory($this->channel); 615 $call = $call_factory($method, $deserialize, $metadata, $options); 616 return $call; 617 } 618} 619