1#!/usr/bin/env ruby 2 3# Copyright 2015 gRPC authors. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# This is the xDS interop test Ruby client. This is meant to be run by 18# the run_xds_tests.py test runner. 19# 20# Usage: $ tools/run_tests/run_xds_tests.py --test_case=... ... 21# --client_cmd="path/to/xds_client.rb --server=<hostname> \ 22# --stats_port=<port> \ 23# --qps=<qps>" 24 25# These lines are required for the generated files to load grpc 26this_dir = File.expand_path(File.dirname(__FILE__)) 27lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib') 28pb_dir = File.dirname(this_dir) 29$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) 30$LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir) 31 32require 'optparse' 33require 'logger' 34 35require_relative '../../lib/grpc' 36require 'google/protobuf' 37 38require_relative '../src/proto/grpc/testing/empty_pb' 39require_relative '../src/proto/grpc/testing/messages_pb' 40require_relative '../src/proto/grpc/testing/test_services_pb' 41 42class RpcConfig 43 attr_reader :rpcs_to_send, :metadata_to_send, :timeout_sec 44 def init(rpcs_to_send, metadata_to_send, timeout_sec = 0) 45 @rpcs_to_send = rpcs_to_send 46 @metadata_to_send = metadata_to_send 47 @timeout_sec = timeout_sec 48 end 49end 50 51# Some global constant mappings 52$RPC_MAP = { 53 'UnaryCall' => :UNARY_CALL, 54 'EmptyCall' => :EMPTY_CALL, 55} 56 57# Some global variables to be shared by server and client 58$watchers = Array.new 59$watchers_mutex = Mutex.new 60$watchers_cv = ConditionVariable.new 61$shutdown = false 62# These can be configured by the test runner dynamically 63$rpc_config = RpcConfig.new 64$rpc_config.init([:UNARY_CALL], {}) 65# These stats are shared across threads 66$thread_results = Array.new 67$thread_results_mu = Mutex.new 68$accumulated_stats_mu = Mutex.new 69$num_rpcs_started_by_method = {} 70$num_rpcs_succeeded_by_method = {} 71$num_rpcs_failed_by_method = {} 72$accumulated_method_stats = {} 73 74# RubyLogger defines a logger for gRPC based on the standard ruby logger. 75module RubyLogger 76 def logger 77 LOGGER 78 end 79 80 LOGGER = Logger.new(STDOUT) 81 LOGGER.level = Logger::INFO 82end 83 84# GRPC is the general RPC module 85module GRPC 86 # Inject the noop #logger if no module-level logger method has been injected. 87 extend RubyLogger 88end 89 90# creates a test stub 91def create_stub(opts) 92 address = "#{opts.server}" 93 GRPC.logger.info("... connecting insecurely to #{address}") 94 Grpc::Testing::TestService::Stub.new( 95 address, 96 :this_channel_is_insecure, 97 ) 98end 99 100class StatsPerMethod 101 attr_reader :rpcs_started, :result 102 def initialize() 103 @rpcs_started = 0 104 @result = Hash.new(0) 105 end 106 def increment_rpcs_started() 107 @rpcs_started += 1 108 end 109 def add_result(status_code) 110 @result[status_code] += 1 111 end 112end 113 114class ConfigureTarget < Grpc::Testing::XdsUpdateClientConfigureService::Service 115 include Grpc::Testing 116 117 def configure(req, _call) 118 metadata_to_send = {} 119 req.metadata.each do |m| 120 rpc = m.type 121 if !metadata_to_send.key?(rpc) 122 metadata_to_send[rpc] = {} 123 end 124 metadata_key = m.key 125 metadata_value = m.value 126 metadata_to_send[rpc][metadata_key] = metadata_value 127 end 128 new_rpc_config = RpcConfig.new 129 new_rpc_config.init(req['types'], metadata_to_send, req['timeout_sec']) 130 $rpc_config = new_rpc_config 131 ClientConfigureResponse.new() 132 end 133end 134 135# This implements LoadBalancerStatsService required by the test runner 136class TestTarget < Grpc::Testing::LoadBalancerStatsService::Service 137 include Grpc::Testing 138 139 def get_client_stats(req, _call) 140 finish_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 141 req['timeout_sec'] 142 watcher = {} 143 $watchers_mutex.synchronize do 144 watcher = { 145 "rpcs_by_method" => Hash.new(), 146 "rpcs_by_peer" => Hash.new(0), 147 "rpcs_needed" => req['num_rpcs'], 148 "no_remote_peer" => 0 149 } 150 $watchers << watcher 151 seconds_remaining = finish_time - 152 Process.clock_gettime(Process::CLOCK_MONOTONIC) 153 while watcher['rpcs_needed'] > 0 && seconds_remaining > 0 154 $watchers_cv.wait($watchers_mutex, seconds_remaining) 155 seconds_remaining = finish_time - 156 Process.clock_gettime(Process::CLOCK_MONOTONIC) 157 end 158 $watchers.delete_at($watchers.index(watcher)) 159 end 160 # convert results into proper proto object 161 rpcs_by_method = {} 162 watcher['rpcs_by_method'].each do |rpc_name, rpcs_by_peer| 163 rpcs_by_method[rpc_name] = LoadBalancerStatsResponse::RpcsByPeer.new( 164 rpcs_by_peer: rpcs_by_peer 165 ) 166 end 167 LoadBalancerStatsResponse.new( 168 rpcs_by_method: rpcs_by_method, 169 rpcs_by_peer: watcher['rpcs_by_peer'], 170 num_failures: watcher['no_remote_peer'] + watcher['rpcs_needed'] 171 ) 172 end 173 174 def get_client_accumulated_stats(req, _call) 175 $accumulated_stats_mu.synchronize do 176 all_stats_per_method = $accumulated_method_stats.map { |rpc, stats_per_method| 177 [rpc, 178 LoadBalancerAccumulatedStatsResponse::MethodStats.new( 179 rpcs_started: stats_per_method.rpcs_started, 180 result: stats_per_method.result 181 )] 182 }.to_h 183 LoadBalancerAccumulatedStatsResponse.new( 184 num_rpcs_started_by_method: $num_rpcs_started_by_method, 185 num_rpcs_succeeded_by_method: $num_rpcs_succeeded_by_method, 186 num_rpcs_failed_by_method: $num_rpcs_failed_by_method, 187 stats_per_method: all_stats_per_method, 188 ) 189 end 190 end 191end 192 193def execute_rpc_in_thread(op, rpc) 194 Thread.new { 195 rpc_stats_key = rpc.to_s 196 remote_peer = "" 197 begin 198 op.execute 199 if op.metadata.key?('hostname') 200 remote_peer = op.metadata['hostname'] 201 end 202 $accumulated_stats_mu.synchronize do 203 $num_rpcs_succeeded_by_method[rpc_stats_key] += 1 204 $accumulated_method_stats[rpc_stats_key].add_result(0) 205 end 206 rescue GRPC::BadStatus => e 207 $accumulated_stats_mu.synchronize do 208 $num_rpcs_failed_by_method[rpc_stats_key] += 1 209 $accumulated_method_stats[rpc_stats_key].add_result(e.code) 210 end 211 end 212 $thread_results_mu.synchronize do 213 $thread_results << [rpc, remote_peer] 214 end 215 } 216end 217 218# send 1 rpc every 1/qps second 219def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs) 220 include Grpc::Testing 221 simple_req = SimpleRequest.new() 222 empty_req = Empty.new() 223 target_next_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) 224 # Some RPCs are meant to be "kept open". Since Ruby does not have an 225 # async API, we are executing those RPCs in a thread so that they don't 226 # block. 227 keep_open_threads = Array.new 228 while !$shutdown 229 now = Process.clock_gettime(Process::CLOCK_MONOTONIC) 230 sleep_seconds = target_next_start - now 231 if sleep_seconds < 0 232 target_next_start = now + target_seconds_between_rpcs 233 else 234 target_next_start += target_seconds_between_rpcs 235 sleep(sleep_seconds) 236 end 237 deadline_sec = $rpc_config.timeout_sec > 0 ? $rpc_config.timeout_sec : 30 238 deadline = GRPC::Core::TimeConsts::from_relative_time(deadline_sec) 239 results = {} 240 $rpc_config.rpcs_to_send.each do |rpc| 241 # rpc is in the form of :UNARY_CALL or :EMPTY_CALL here 242 metadata = $rpc_config.metadata_to_send.key?(rpc) ? 243 $rpc_config.metadata_to_send[rpc] : {} 244 $accumulated_stats_mu.synchronize do 245 $num_rpcs_started_by_method[rpc.to_s] += 1 246 $accumulated_method_stats[rpc.to_s].increment_rpcs_started() 247 end 248 if rpc == :UNARY_CALL 249 op = stub.unary_call(simple_req, 250 metadata: metadata, 251 deadline: deadline, 252 return_op: true) 253 elsif rpc == :EMPTY_CALL 254 op = stub.empty_call(empty_req, 255 metadata: metadata, 256 deadline: deadline, 257 return_op: true) 258 else 259 raise "Unsupported rpc #{rpc}" 260 end 261 keep_open_threads << execute_rpc_in_thread(op, rpc) 262 end 263 # collect results from threads 264 $thread_results_mu.synchronize do 265 $thread_results.each do |r| 266 rpc_name, remote_peer = r 267 results[rpc_name] = remote_peer 268 end 269 $thread_results = Array.new 270 end 271 $watchers_mutex.synchronize do 272 $watchers.each do |watcher| 273 # this is counted once when each group of all rpcs_to_send were done 274 watcher['rpcs_needed'] -= 1 275 results.each do |rpc_name, remote_peer| 276 # These stats expect rpc_name to be in the form of 277 # UnaryCall or EmptyCall, not the underscore-case all-caps form 278 rpc_name = $RPC_MAP.invert()[rpc_name] 279 if remote_peer.strip.empty? 280 # error is counted per individual RPC 281 watcher['no_remote_peer'] += 1 282 else 283 if not watcher['rpcs_by_method'].key?(rpc_name) 284 watcher['rpcs_by_method'][rpc_name] = Hash.new(0) 285 end 286 # increment the remote hostname distribution histogram 287 # both by overall, and broken down per RPC 288 watcher['rpcs_by_method'][rpc_name][remote_peer] += 1 289 watcher['rpcs_by_peer'][remote_peer] += 1 290 end 291 end 292 end 293 $watchers_cv.broadcast 294 end 295 end 296 keep_open_threads.each { |thd| thd.join } 297end 298 299# Args is used to hold the command line info. 300Args = Struct.new(:fail_on_failed_rpcs, :num_channels, 301 :rpc, :metadata, 302 :server, :stats_port, :qps) 303 304# validates the command line options, returning them as a Hash. 305def parse_args 306 args = Args.new 307 args['fail_on_failed_rpcs'] = false 308 args['num_channels'] = 1 309 args['rpc'] = 'UnaryCall' 310 args['metadata'] = '' 311 OptionParser.new do |opts| 312 opts.on('--fail_on_failed_rpcs BOOL', ['false', 'true']) do |v| 313 args['fail_on_failed_rpcs'] = v == 'true' 314 end 315 opts.on('--num_channels CHANNELS', 'number of channels') do |v| 316 args['num_channels'] = v.to_i 317 end 318 opts.on('--rpc RPCS_TO_SEND', 'list of RPCs to send') do |v| 319 args['rpc'] = v 320 end 321 opts.on('--metadata METADATA_TO_SEND', 'metadata to send per RPC') do |v| 322 args['metadata'] = v 323 end 324 opts.on('--server SERVER_HOST', 'server hostname') do |v| 325 GRPC.logger.info("ruby xds: server address is #{v}") 326 args['server'] = v 327 end 328 opts.on('--stats_port STATS_PORT', 'stats port') do |v| 329 GRPC.logger.info("ruby xds: stats port is #{v}") 330 args['stats_port'] = v 331 end 332 opts.on('--qps QPS', 'qps') do |v| 333 GRPC.logger.info("ruby xds: qps is #{v}") 334 args['qps'] = v 335 end 336 end.parse! 337 args 338end 339 340def main 341 opts = parse_args 342 343 # This server hosts the LoadBalancerStatsService 344 host = "0.0.0.0:#{opts['stats_port']}" 345 s = GRPC::RpcServer.new 346 s.add_http2_port(host, :this_port_is_insecure) 347 s.handle(TestTarget) 348 s.handle(ConfigureTarget) 349 server_thread = Thread.new { 350 # run the server until the main test runner terminates this process 351 s.run_till_terminated_or_interrupted(['TERM']) 352 } 353 354 # Initialize stats 355 $RPC_MAP.values.each do |rpc| 356 $num_rpcs_started_by_method[rpc.to_s] = 0 357 $num_rpcs_succeeded_by_method[rpc.to_s] = 0 358 $num_rpcs_failed_by_method[rpc.to_s] = 0 359 $accumulated_method_stats[rpc.to_s] = StatsPerMethod.new 360 end 361 362 # The client just sends rpcs continuously in a regular interval 363 stub = create_stub(opts) 364 target_seconds_between_rpcs = (1.0 / opts['qps'].to_f) 365 # Convert 'metadata' input in the form of 366 # rpc1:k1:v1,rpc2:k2:v2,rpc1:k3:v3 367 # into 368 # { 369 # 'rpc1' => { 370 # 'k1' => 'v1', 371 # 'k3' => 'v3', 372 # }, 373 # 'rpc2' => { 374 # 'k2' => 'v2' 375 # }, 376 # } 377 rpcs_to_send = [] 378 metadata_to_send = {} 379 if opts['metadata'] 380 metadata_entries = opts['metadata'].split(',') 381 metadata_entries.each do |e| 382 (rpc_name, metadata_key, metadata_value) = e.split(':') 383 rpc_name = $RPC_MAP[rpc_name] 384 # initialize if we haven't seen this rpc_name yet 385 if !metadata_to_send.key?(rpc_name) 386 metadata_to_send[rpc_name] = {} 387 end 388 metadata_to_send[rpc_name][metadata_key] = metadata_value 389 end 390 end 391 if opts['rpc'] 392 rpcs_to_send = opts['rpc'].split(',') 393 end 394 if rpcs_to_send.size > 0 395 rpcs_to_send.map! { |rpc| $RPC_MAP[rpc] } 396 new_rpc_config = RpcConfig.new 397 new_rpc_config.init(rpcs_to_send, metadata_to_send) 398 $rpc_config = new_rpc_config 399 end 400 client_threads = Array.new 401 opts['num_channels'].times { 402 client_threads << Thread.new { 403 run_test_loop(stub, target_seconds_between_rpcs, 404 opts['fail_on_failed_rpcs']) 405 } 406 } 407 408 server_thread.join 409 $shutdown = true 410 client_threads.each { |thd| thd.join } 411end 412 413if __FILE__ == $0 414 main 415end 416