1<?php 2/* 3 * 4 * Copyright 2020 gRPC authors. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 */ 19 20/** 21 * This is the PHP xDS Interop test client. This script is meant to be run by 22 * the main xDS Interep test runner "run_xds_tests.py", not to be run 23 * by itself standalone. 24 */ 25$autoload_path = realpath(dirname(__FILE__).'/../../vendor/autoload.php'); 26require_once $autoload_path; 27 28class XdsUpdateClientConfigureService 29 extends \Grpc\Testing\XdsUpdateClientConfigureServiceStub 30{ 31 function configure( 32 \Grpc\Testing\ClientConfigureRequest $request, 33 \Grpc\ServerContext $context 34 ): ?\Grpc\Testing\ClientConfigureResponse { 35 $rpc_types = $request->getTypes(); 36 $all_metadata = $request->getMetadata(); 37 $rpcs_to_send = []; 38 foreach ($rpc_types as $rpc_type) { 39 if ($rpc_type == 40 \Grpc\Testing\ClientConfigureRequest\RpcType::EMPTY_CALL) { 41 $rpcs_to_send[] = 'EmptyCall'; 42 } else if ($rpc_type == 43 \Grpc\Testing\ClientConfigureRequest\RpcType::UNARY_CALL) { 44 $rpcs_to_send[] = 'UnaryCall'; 45 } 46 } 47 $metadata_to_send = []; 48 foreach ($all_metadata as $metadata) { 49 $rpc_type = $metadata->getType(); 50 if ($rpc_type == 51 \Grpc\Testing\ClientConfigureRequest\RpcType::EMPTY_CALL) { 52 $rpc_type_key = 'EmptyCall'; 53 } else if ($rpc_type == 54 \Grpc\Testing\ClientConfigureRequest\RpcType::UNARY_CALL) { 55 $rpc_type_key = 'UnaryCall'; 56 } 57 $key = $metadata->getKey(); 58 $value = $metadata->getValue(); 59 if (!isset($metadata_to_send[$rpc_type_key])) { 60 $metadata_to_send[$rpc_type_key] = []; 61 } 62 $metadata_to_send[$rpc_type_key][$key] = $value; 63 } 64 global $client_thread; 65 echo "PHP parent: Setting client_thread rpc_config to \n"; 66 print_r($rpcs_to_send); 67 print_r($metadata_to_send); 68 echo "PHP parent: timeout_sec = ".$request->getTimeoutSec()."\n"; 69 $client_thread->rpc_config->update($rpcs_to_send, 70 $metadata_to_send, 71 $request->getTimeoutSec()); 72 return new Grpc\Testing\ClientConfigureResponse(); 73 } 74} 75 76// The main xds interop test runner will ping this service to ask for 77// the stats of the distribution of the backends, for the next X rpcs. 78class LoadBalancerStatsService 79 extends \Grpc\Testing\LoadBalancerStatsServiceStub 80{ 81 function getClientStats( 82 \Grpc\Testing\LoadBalancerStatsRequest $request, 83 \Grpc\ServerContext $context 84 ): ?\Grpc\Testing\LoadBalancerStatsResponse { 85 $num_rpcs = $request->getNumRpcs(); 86 $timeout_sec = $request->getTimeoutSec(); 87 $rpcs_by_method = []; 88 $rpcs_by_peer = []; 89 $num_failures = 0; 90 91 // Heavy limitation now: the server is blocking, until all 92 // the necessary num_rpcs are finished, or timeout is reached 93 global $client_thread; 94 $start_id = $client_thread->num_results + 1; 95 $end_id = $start_id + $num_rpcs; 96 $now = hrtime(true); 97 $timeout = $now[0] + ($now[1] / 1e9) + $timeout_sec; 98 while (true) { 99 $curr_hr = hrtime(true); 100 $curr_time = $curr_hr[0] + ($curr_hr[1] / 1e9); 101 if ($curr_time > $timeout) { 102 break; 103 } 104 // Thread variable seems to be read-only 105 $curr_id = $client_thread->num_results; 106 if ($curr_id >= $end_id) { 107 break; 108 } 109 usleep(50000); 110 } 111 112 // Tally up results 113 $end_id = min($end_id, $client_thread->num_results); 114 // "$client_thread->results" will be in the form of 115 // [ 116 // 'rpc1' => [ 117 // 'hostname1', '', 'hostname2', 'hostname1', '', ... 118 // ], 119 // 'rpc2' => [ 120 // '', 'hostname1', 'hostname2', '', 'hostname2', ... 121 // ], 122 // ] 123 foreach ((array)$client_thread->rpc_config->rpcs_to_send as $rpc) { 124 $results = $client_thread->results[$rpc]; 125 // initialize, can always start from scratch here 126 $rpcs_by_method[$rpc] = []; 127 for ($i = $start_id; $i < $end_id; $i++) { 128 $hostname = $results[$i]; 129 if ($hostname) { 130 // initialize in case we haven't seen this hostname 131 // before 132 if (!array_key_exists($hostname, $rpcs_by_method[$rpc])) { 133 $rpcs_by_method[$rpc][$hostname] = 0; 134 } 135 if (!array_key_exists($hostname, $rpcs_by_peer)) { 136 $rpcs_by_peer[$hostname] = 0; 137 } 138 // increment the remote hostname distribution histogram 139 // both by overall, and broken down per RPC 140 $rpcs_by_method[$rpc][$hostname] += 1; 141 $rpcs_by_peer[$hostname] += 1; 142 } else { 143 // $num_failures here are counted per individual RPC 144 $num_failures += 1; 145 } 146 } 147 } 148 149 // Convert our hashmaps above into protobuf objects 150 $response = new Grpc\Testing\LoadBalancerStatsResponse(); 151 $rpcs_by_method_map = []; 152 foreach ($rpcs_by_method as $rpc => $rpcs_by_peer_per_method) { 153 $rpcs_by_peer_proto_obj 154 = new Grpc\Testing\LoadBalancerStatsResponse\RpcsByPeer(); 155 $rpcs_by_peer_proto_obj->setRpcsByPeer($rpcs_by_peer_per_method); 156 $rpcs_by_method_map[$rpc] = $rpcs_by_peer_proto_obj; 157 } 158 $response->setRpcsByPeer($rpcs_by_peer); 159 $response->setRpcsByMethod($rpcs_by_method_map); 160 $response->setNumFailures($num_failures); 161 return $response; 162 } 163 164 function GetClientAccumulatedStats( 165 \Grpc\Testing\LoadBalancerAccumulatedStatsRequest $request, 166 \Grpc\ServerContext $context 167 ): ?\Grpc\Testing\LoadBalancerAccumulatedStatsResponse { 168 global $client_thread; 169 $response = new Grpc\Testing\LoadBalancerAccumulatedStatsResponse(); 170 $response->setNumRpcsStartedByMethod( 171 (array)$client_thread->num_rpcs_started_by_method); 172 $response->setNumRpcsSucceededByMethod( 173 (array)$client_thread->num_rpcs_succeeded_by_method); 174 $response->setNumRpcsFailedByMethod( 175 (array)$client_thread->num_rpcs_failed_by_method); 176 $accumulated_method_stats 177 = (array)$client_thread->accumulated_method_stats; 178 $stats_per_method = []; 179 foreach ($accumulated_method_stats as $rpc_name => $stats) { 180 $methodStats 181 = new Grpc\Testing\LoadBalancerAccumulatedStatsResponse\MethodStats(); 182 $methodStats->setRpcsStarted($stats['rpcs_started']); 183 $methodStats->setResult((array)$stats['result']); 184 $stats_per_method[$rpc_name] = $methodStats; 185 } 186 $response->setStatsPerMethod($stats_per_method); 187 return $response; 188 } 189} 190 191class RpcConfig extends Volatile { 192 public $server_address; 193 public $qps; 194 public $fail_on_failed_rpcs; 195 public $rpcs_to_send; 196 public $metadata_to_send; 197 public $tmp_file1; 198 public $tmp_file2; 199 public $timeout_sec; 200 public function __construct($server_address, 201 $qps, 202 $fail_on_failed_rpcs, 203 $rpcs_to_send, 204 $metadata_to_send, 205 $tmp_file1, 206 $tmp_file2) { 207 $this->server_address = $server_address; 208 $this->qps = $qps; 209 $this->fail_on_failed_rpcs = $fail_on_failed_rpcs; 210 $this->rpcs_to_send = (array)$rpcs_to_send; 211 $this->metadata_to_send = (array)$metadata_to_send; 212 $this->tmp_file1 = $tmp_file1; 213 $this->tmp_file2 = $tmp_file2; 214 $this->timeout_sec = 30; 215 } 216 public function update($rpcs_to_send, $metadata_to_send, $timeout_sec) { 217 $this->rpcs_to_send = (array)$rpcs_to_send; 218 $this->metadata_to_send = (array)$metadata_to_send; 219 $this->timeout_sec = $timeout_sec; 220 } 221} 222 223// This client thread blindly sends a unary RPC to the server once 224// every 1 / qps seconds. 225class ClientThread extends Thread { 226 private $target_seconds_between_rpcs_; 227 private $autoload_path_; 228 private $TIMEOUT_US = 30 * 1e6; // 30 seconds 229 public $rpc_config; 230 public $num_results = 0; 231 public $results; 232 233 public $RPC_MAP = [ 234 'UnaryCall' => 'UNARY_CALL', 235 'EmptyCall' => 'EMPTY_CALL', 236 ]; 237 238 public $num_rpcs_started_by_method = []; 239 public $num_rpcs_succeeded_by_method = []; 240 public $num_rpcs_failed_by_method = []; 241 public $accumulated_method_stats = []; 242 243 public function __construct($rpc_config, 244 $autoload_path) { 245 $this->rpc_config = $rpc_config; 246 $this->target_seconds_between_rpcs_ = 1.0 / $rpc_config->qps; 247 $this->autoload_path_ = $autoload_path; 248 $this->simple_request = new Grpc\Testing\SimpleRequest(); 249 $this->empty_request = new Grpc\Testing\EmptyMessage(); 250 $this->results = []; 251 foreach (['UnaryCall', 'EmptyCall'] as $rpc) { 252 $this->results[$rpc] = []; 253 } 254 $this->outstanding_rpcs = []; 255 foreach (['UNARY_CALL', 'EMPTY_CALL'] as $rpc_stats_key) { 256 $this->num_rpcs_started_by_method[$rpc_stats_key] = 0; 257 $this->num_rpcs_succeeded_by_method[$rpc_stats_key] = 0; 258 $this->num_rpcs_failed_by_method[$rpc_stats_key] = 0; 259 $this->accumulated_method_stats[$rpc_stats_key] = [ 260 'rpcs_started' => 0, 261 'result' => [], 262 ]; 263 } 264 } 265 266 public function sendUnaryCall($stub, $metadata) { 267 $timeout = $this->rpc_config->timeout_sec ? 268 $this->rpc_config->timeout_sec * 1e6 : 269 $this->TIMEOUT_US; 270 return $stub->UnaryCall($this->simple_request, 271 $metadata, 272 ['timeout' => $timeout]); 273 } 274 275 public function sendEmptyCall($stub, $metadata) { 276 $timeout = $this->rpc_config->timeout_sec ? 277 $this->rpc_config->timeout_sec * 1e6 : 278 $this->TIMEOUT_US; 279 return $stub->EmptyCall($this->empty_request, 280 $metadata, 281 ['timeout' => $timeout]); 282 } 283 284 public function add_rpc_result($rpc, $status_code) { 285 // $rpc here needs to be in the format of 'UnaryCall', 'EmptyCall' 286 if (!isset($this->accumulated_method_stats[$this->RPC_MAP[$rpc]] 287 ['result'][$status_code])) { 288 $this->accumulated_method_stats[$this->RPC_MAP[$rpc]] 289 ['result'][$status_code] = 0; 290 } 291 $this->accumulated_method_stats[$this->RPC_MAP[$rpc]] 292 ['result'][$status_code] += 1; 293 } 294 295 public function check_child_process_result() { 296 if (sizeof($this->outstanding_rpcs) > 0 && 297 $this->rpc_config->tmp_file2) { 298 $keys_to_delete = []; 299 // tmp_file2 contains the RPC result of each RPC we 300 // originally wrote to tmp_file1 301 $f2 = fopen($this->rpc_config->tmp_file2, 'r+'); 302 flock($f2, LOCK_EX); 303 while (true) { 304 $f2_line = fgets($f2); 305 if (!$f2_line) { 306 break; 307 } 308 // format here needs to be in sync with 309 // src/php/bin/xds_manager.py 310 $parts = explode(',', trim($f2_line)); 311 $key = $parts[0]; 312 $returncode = $parts[1]; 313 if (isset($this->outstanding_rpcs[$key])) { 314 $parts2 = explode('|', $key); 315 $result_num = $parts2[0]; 316 $rpc_name = $parts2[1]; 317 // Child processes can only communicate back the 318 // status code for now. 319 // Current interop test specs only call for 320 // reporting back the status code in these scenarios. 321 // If we ever need the hostname reported back from 322 // child processes, we need to enhance this 323 // communication framework through tmp files. 324 $this->results[$rpc_name][$result_num] = ""; 325 if ($returncode) { 326 $this->num_rpcs_failed_by_method 327 [$this->RPC_MAP[$rpc_name]] += 1; 328 } else { 329 $this->num_rpcs_succeeded_by_method 330 [$this->RPC_MAP[$rpc_name]] += 1; 331 } 332 $this->add_rpc_result($rpc_name, $returncode); 333 $keys_to_delete[] = $key; 334 } 335 } 336 foreach ($keys_to_delete as $key) { 337 unset($this->outstanding_rpcs[$key]); 338 } 339 ftruncate($f2, 0); 340 flock($f2, LOCK_UN); 341 fclose($f2); 342 } 343 } 344 345 public function execute_rpc_in_child_process($rpc, $metadata_serialized) { 346 // tmp_file1 contains the list of RPCs (and their 347 // specs) we want executed. This will be picked up 348 // by src/php/bin/xds_manager.py 349 $f1 = fopen($this->rpc_config->tmp_file1, 'a'); 350 $key = implode('|', [$this->num_results, 351 $rpc, 352 $metadata_serialized, 353 $this->rpc_config->timeout_sec]); 354 flock($f1, LOCK_EX); 355 fwrite($f1, $key."\n"); 356 fflush($f1); 357 flock($f1, LOCK_UN); 358 fclose($f1); 359 $this->outstanding_rpcs[$key] = 1; 360 $this->num_rpcs_started_by_method[$this->RPC_MAP[$rpc]] += 1; 361 $this->accumulated_method_stats[$this->RPC_MAP[$rpc]] 362 ['rpcs_started'] += 1; 363 } 364 365 public function run() { 366 // Autoloaded classes do not get inherited in threads. 367 // Hence we need to do this. 368 require_once($this->autoload_path_); 369 370 $stub = new Grpc\Testing\TestServiceClient( 371 $this->rpc_config->server_address, 372 ['credentials' => Grpc\ChannelCredentials::createInsecure() 373 ]); 374 // hrtime returns nanoseconds 375 $target_next_start_us = hrtime(true) / 1000; 376 while (true) { 377 $now_us = hrtime(true) / 1000; 378 $sleep_us = $target_next_start_us - $now_us; 379 if ($sleep_us < 0) { 380 $target_next_start_us = 381 $now_us + ($this->target_seconds_between_rpcs_ * 1e6); 382 } else { 383 $target_next_start_us += 384 ($this->target_seconds_between_rpcs_ * 1e6); 385 usleep($sleep_us); 386 } 387 $this->check_child_process_result(); 388 foreach ($this->rpc_config->rpcs_to_send as $rpc) { 389 $metadata_to_send_arr 390 = (array)$this->rpc_config->metadata_to_send; 391 $metadata = array_key_exists($rpc, $metadata_to_send_arr) ? 392 $metadata_to_send_arr[$rpc] : []; 393 // This copy is somehow necessary because 394 // $this->metadata_to_send[$rpc] somehow becomes a 395 // Volatile object, instead of an associative array. 396 $metadata_array = []; 397 $execute_in_child_process = false; 398 foreach ($metadata as $key => $value) { 399 $metadata_array[$key] = [$value]; 400 if ($key == 'rpc-behavior' || $key == 'fi_testcase') { 401 $execute_in_child_process = true; 402 } 403 } 404 if ($execute_in_child_process && $this->rpc_config->tmp_file1) { 405 // if 'rpc-behavior' is set, we need to pawn off 406 // the execution to some other child PHP processes 407 $this->execute_rpc_in_child_process( 408 $rpc, serialize($metadata_array)); 409 continue; 410 } 411 // Execute RPC within this script 412 $call = null; 413 if ($rpc == 'UnaryCall') { 414 $call = $this->sendUnaryCall($stub, $metadata_array); 415 } else if ($rpc == 'EmptyCall') { 416 $call = $this->sendEmptyCall($stub, $metadata_array); 417 } else { 418 throw new Exception("Unhandled rpc $rpc"); 419 } 420 $this->num_rpcs_started_by_method[$this->RPC_MAP[$rpc]] += 1; 421 $this->accumulated_method_stats[$this->RPC_MAP[$rpc]] 422 ['rpcs_started'] += 1; 423 // the remote peer is being returned as part of the 424 // initial metadata, according to the test spec 425 $initial_metadata = $call->getMetadata(); 426 list($response, $status) = $call->wait(); 427 if ($status->code == Grpc\STATUS_OK && 428 array_key_exists('hostname', $initial_metadata)) { 429 $this->results[$rpc][$this->num_results] 430 = $initial_metadata['hostname'][0]; 431 $this->num_rpcs_succeeded_by_method 432 [$this->RPC_MAP[$rpc]] += 1; 433 $this->add_rpc_result($rpc, 0); 434 } else { 435 if ($this->rpc_config->fail_on_failed_rpcs_) { 436 throw new Exception("$rpc failed with status " 437 . $status->code); 438 } 439 $this->results[$rpc][$this->num_results] = ""; 440 $this->num_rpcs_failed_by_method 441 [$this->RPC_MAP[$rpc]] += 1; 442 $this->add_rpc_result($rpc, $status->code); 443 } 444 } 445 // $num_results here is only incremented when the group of 446 // all $rpcs_to_send are done. 447 $this->num_results++; 448 } 449 } 450 451 // This is needed for loading autoload_path in the child thread 452 public function start(int $options = PTHREADS_INHERIT_ALL) { 453 return parent::start(PTHREADS_INHERIT_NONE); 454 } 455} 456 457 458// Note: num_channels are currently ignored for now 459$args = getopt('', ['fail_on_failed_rpcs:', 'num_channels:', 460 'rpc:', 'metadata:', 'tmp_file1:', 'tmp_file2:', 461 'server:', 'stats_port:', 'qps:']); 462 463// Convert input in the form of 464// rpc1:k1:v1,rpc2:k2:v2,rpc1:k3:v3 465// into 466// [ 467// 'rpc1' => [ 468// 'k1' => 'v1', 469// 'k3' => 'v3', 470// ], 471// 'rpc2' => [ 472// 'k2' => 'v2' 473// ], 474// ] 475$metadata_to_send = []; 476if ($_all_metadata = explode(',', $args['metadata'])) { 477 foreach ($_all_metadata as $one_metadata_pair) { 478 list($rpc, 479 $metadata_key, 480 $metadata_value) = explode(':', $one_metadata_pair); 481 // initialize in case we haven't seen this rpc before 482 if (!array_key_exists($rpc, $metadata_to_send)) { 483 $metadata_to_send[$rpc] = []; 484 } 485 $metadata_to_send[$rpc][$metadata_key] = $metadata_value; 486 } 487} 488$rpcs_to_send = (empty($args['rpc']) ? 'UnaryCall' : $args['rpc']); 489 490// Need to communicate the xds server name to the async runner manager 491if ($args['tmp_file1']) { 492 $f1 = fopen($args['tmp_file1'], 'w'); 493 fwrite($f1, 'server_address,'.$args['server']); 494 fclose($f1); 495} 496 497$rpc_config = new RpcConfig($args['server'], 498 $args['qps'], 499 $args['fail_on_failed_rpcs'], 500 explode(',', $rpcs_to_send), 501 $metadata_to_send, 502 $args['tmp_file1'], 503 $args['tmp_file2']); 504 505 506$client_thread = new ClientThread($rpc_config, 507 $autoload_path); 508$client_thread->start(); 509 510$server = new Grpc\RpcServer(); 511$server->addHttp2Port('0.0.0.0:'.$args['stats_port']); 512$server->handle(new LoadBalancerStatsService()); 513$server->handle(new XdsUpdateClientConfigureService()); 514$server->run(); 515