1<?php 2/* 3 * 4 * Copyright 2020 gRPC authors. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 */ 19 20namespace Grpc; 21 22/** 23 * This is an experimental and incomplete implementation of gRPC server 24 * for PHP. APIs are _definitely_ going to be changed. 25 * 26 * DO NOT USE in production. 27 */ 28 29/** 30 * Class RpcServer 31 * @package Grpc 32 */ 33class RpcServer extends Server 34{ 35 // [ <String method_full_path> => MethodDescriptor ] 36 private $paths_map = []; 37 38 private function waitForNextEvent() 39 { 40 return $this->requestCall(); 41 } 42 43 /** 44 * Add a service to this server 45 * 46 * @param Object $service The service to be added 47 */ 48 public function handle($service) 49 { 50 $methodDescriptors = $service->getMethodDescriptors(); 51 $exist_methods = array_intersect_key($this->paths_map, $methodDescriptors); 52 if (!empty($exist_methods)) { 53 fwrite(STDERR, "WARNING: " . 'override already registered methods: ' . 54 implode(', ', array_keys($exist_methods)) . PHP_EOL); 55 } 56 57 $this->paths_map = array_merge($this->paths_map, $methodDescriptors); 58 return $this->paths_map; 59 } 60 61 public function run() 62 { 63 $this->start(); 64 while (true) try { 65 // This blocks until the server receives a request 66 $event = $this->waitForNextEvent(); 67 68 $full_path = $event->method; 69 $context = new ServerContext($event); 70 $server_writer = new ServerCallWriter($event->call, $context); 71 72 if (!array_key_exists($full_path, $this->paths_map)) { 73 $context->setStatus(Status::unimplemented()); 74 $server_writer->finish(); 75 continue; 76 }; 77 78 $method_desc = $this->paths_map[$full_path]; 79 $server_reader = new ServerCallReader( 80 $event->call, 81 $method_desc->request_type 82 ); 83 84 try { 85 $this->processCall( 86 $method_desc, 87 $server_reader, 88 $server_writer, 89 $context 90 ); 91 } catch (\Exception $e) { 92 $context->setStatus(Status::status( 93 STATUS_INTERNAL, 94 $e->getMessage() 95 )); 96 $server_writer->finish(); 97 } 98 } catch (\Exception $e) { 99 fwrite(STDERR, "ERROR: " . $e->getMessage() . PHP_EOL); 100 exit(1); 101 } 102 } 103 104 private function processCall( 105 MethodDescriptor $method_desc, 106 ServerCallReader $server_reader, 107 ServerCallWriter $server_writer, 108 ServerContext $context 109 ) { 110 // Dispatch to actual server logic 111 switch ($method_desc->call_type) { 112 case MethodDescriptor::UNARY_CALL: 113 $request = $server_reader->read(); 114 $response = 115 call_user_func( 116 array($method_desc->service, $method_desc->method_name), 117 $request ?? new $method_desc->request_type, 118 $context 119 ); 120 $server_writer->finish($response); 121 break; 122 case MethodDescriptor::SERVER_STREAMING_CALL: 123 $request = $server_reader->read(); 124 call_user_func( 125 array($method_desc->service, $method_desc->method_name), 126 $request ?? new $method_desc->request_type, 127 $server_writer, 128 $context 129 ); 130 break; 131 case MethodDescriptor::CLIENT_STREAMING_CALL: 132 $response = call_user_func( 133 array($method_desc->service, $method_desc->method_name), 134 $server_reader, 135 $context 136 ); 137 $server_writer->finish($response); 138 break; 139 case MethodDescriptor::BIDI_STREAMING_CALL: 140 call_user_func( 141 array($method_desc->service, $method_desc->method_name), 142 $server_reader, 143 $server_writer, 144 $context 145 ); 146 break; 147 default: 148 throw new \Exception(); 149 } 150 } 151} 152