xref: /aosp_15_r20/external/grpc-grpc/src/ruby/pb/test/xds_client.rb (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1#!/usr/bin/env ruby
2
3# Copyright 2015 gRPC authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# This is the xDS interop test Ruby client. This is meant to be run by
18# the run_xds_tests.py test runner.
19#
20# Usage: $ tools/run_tests/run_xds_tests.py --test_case=... ...
21#    --client_cmd="path/to/xds_client.rb --server=<hostname> \
22#                                        --stats_port=<port> \
23#                                        --qps=<qps>"
24
25# These lines are required for the generated files to load grpc
26this_dir = File.expand_path(File.dirname(__FILE__))
27lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib')
28pb_dir = File.dirname(this_dir)
29$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
30$LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir)
31
32require 'optparse'
33require 'logger'
34
35require_relative '../../lib/grpc'
36require 'google/protobuf'
37
38require_relative '../src/proto/grpc/testing/empty_pb'
39require_relative '../src/proto/grpc/testing/messages_pb'
40require_relative '../src/proto/grpc/testing/test_services_pb'
41
42class RpcConfig
43  attr_reader :rpcs_to_send, :metadata_to_send, :timeout_sec
44  def init(rpcs_to_send, metadata_to_send, timeout_sec = 0)
45    @rpcs_to_send = rpcs_to_send
46    @metadata_to_send = metadata_to_send
47    @timeout_sec = timeout_sec
48  end
49end
50
51# Some global constant mappings
52$RPC_MAP = {
53  'UnaryCall' => :UNARY_CALL,
54  'EmptyCall' => :EMPTY_CALL,
55}
56
57# Some global variables to be shared by server and client
58$watchers = Array.new
59$watchers_mutex = Mutex.new
60$watchers_cv = ConditionVariable.new
61$shutdown = false
62# These can be configured by the test runner dynamically
63$rpc_config = RpcConfig.new
64$rpc_config.init([:UNARY_CALL], {})
65# These stats are shared across threads
66$thread_results = Array.new
67$thread_results_mu = Mutex.new
68$accumulated_stats_mu = Mutex.new
69$num_rpcs_started_by_method = {}
70$num_rpcs_succeeded_by_method = {}
71$num_rpcs_failed_by_method = {}
72$accumulated_method_stats = {}
73
74# RubyLogger defines a logger for gRPC based on the standard ruby logger.
75module RubyLogger
76  def logger
77    LOGGER
78  end
79
80  LOGGER = Logger.new(STDOUT)
81  LOGGER.level = Logger::INFO
82end
83
84# GRPC is the general RPC module
85module GRPC
86  # Inject the noop #logger if no module-level logger method has been injected.
87  extend RubyLogger
88end
89
90# creates a test stub
91def create_stub(opts)
92  address = "#{opts.server}"
93  GRPC.logger.info("... connecting insecurely to #{address}")
94  Grpc::Testing::TestService::Stub.new(
95    address,
96    :this_channel_is_insecure,
97  )
98end
99
100class StatsPerMethod
101  attr_reader :rpcs_started, :result
102  def initialize()
103    @rpcs_started = 0
104    @result = Hash.new(0)
105  end
106  def increment_rpcs_started()
107    @rpcs_started += 1
108  end
109  def add_result(status_code)
110    @result[status_code] += 1
111  end
112end
113
114class ConfigureTarget < Grpc::Testing::XdsUpdateClientConfigureService::Service
115  include Grpc::Testing
116
117  def configure(req, _call)
118    metadata_to_send = {}
119    req.metadata.each do |m|
120      rpc = m.type
121      if !metadata_to_send.key?(rpc)
122        metadata_to_send[rpc] = {}
123      end
124      metadata_key = m.key
125      metadata_value = m.value
126      metadata_to_send[rpc][metadata_key] = metadata_value
127    end
128    new_rpc_config = RpcConfig.new
129    new_rpc_config.init(req['types'], metadata_to_send, req['timeout_sec'])
130    $rpc_config = new_rpc_config
131    ClientConfigureResponse.new()
132  end
133end
134
135# This implements LoadBalancerStatsService required by the test runner
136class TestTarget < Grpc::Testing::LoadBalancerStatsService::Service
137  include Grpc::Testing
138
139  def get_client_stats(req, _call)
140    finish_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) +
141                  req['timeout_sec']
142    watcher = {}
143    $watchers_mutex.synchronize do
144      watcher = {
145        "rpcs_by_method" => Hash.new(),
146        "rpcs_by_peer" => Hash.new(0),
147        "rpcs_needed" => req['num_rpcs'],
148        "no_remote_peer" => 0
149      }
150      $watchers << watcher
151      seconds_remaining = finish_time -
152                          Process.clock_gettime(Process::CLOCK_MONOTONIC)
153      while watcher['rpcs_needed'] > 0 && seconds_remaining > 0
154        $watchers_cv.wait($watchers_mutex, seconds_remaining)
155        seconds_remaining = finish_time -
156                            Process.clock_gettime(Process::CLOCK_MONOTONIC)
157      end
158      $watchers.delete_at($watchers.index(watcher))
159    end
160    # convert results into proper proto object
161    rpcs_by_method = {}
162    watcher['rpcs_by_method'].each do |rpc_name, rpcs_by_peer|
163      rpcs_by_method[rpc_name] = LoadBalancerStatsResponse::RpcsByPeer.new(
164        rpcs_by_peer: rpcs_by_peer
165      )
166    end
167    LoadBalancerStatsResponse.new(
168      rpcs_by_method: rpcs_by_method,
169      rpcs_by_peer: watcher['rpcs_by_peer'],
170      num_failures: watcher['no_remote_peer'] + watcher['rpcs_needed']
171    )
172  end
173
174  def get_client_accumulated_stats(req, _call)
175    $accumulated_stats_mu.synchronize do
176      all_stats_per_method = $accumulated_method_stats.map { |rpc, stats_per_method|
177        [rpc,
178         LoadBalancerAccumulatedStatsResponse::MethodStats.new(
179          rpcs_started: stats_per_method.rpcs_started,
180          result: stats_per_method.result
181         )]
182      }.to_h
183      LoadBalancerAccumulatedStatsResponse.new(
184        num_rpcs_started_by_method: $num_rpcs_started_by_method,
185        num_rpcs_succeeded_by_method: $num_rpcs_succeeded_by_method,
186        num_rpcs_failed_by_method: $num_rpcs_failed_by_method,
187        stats_per_method: all_stats_per_method,
188      )
189    end
190  end
191end
192
193def execute_rpc_in_thread(op, rpc)
194  Thread.new {
195    rpc_stats_key = rpc.to_s
196    remote_peer = ""
197    begin
198      op.execute
199      if op.metadata.key?('hostname')
200        remote_peer = op.metadata['hostname']
201      end
202      $accumulated_stats_mu.synchronize do
203        $num_rpcs_succeeded_by_method[rpc_stats_key] += 1
204        $accumulated_method_stats[rpc_stats_key].add_result(0)
205      end
206    rescue GRPC::BadStatus => e
207      $accumulated_stats_mu.synchronize do
208        $num_rpcs_failed_by_method[rpc_stats_key] += 1
209        $accumulated_method_stats[rpc_stats_key].add_result(e.code)
210      end
211    end
212    $thread_results_mu.synchronize do
213      $thread_results << [rpc, remote_peer]
214    end
215  }
216end
217
218# send 1 rpc every 1/qps second
219def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs)
220  include Grpc::Testing
221  simple_req = SimpleRequest.new()
222  empty_req = Empty.new()
223  target_next_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
224  # Some RPCs are meant to be "kept open". Since Ruby does not have an
225  # async API, we are executing those RPCs in a thread so that they don't
226  # block.
227  keep_open_threads = Array.new
228  while !$shutdown
229    now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
230    sleep_seconds = target_next_start - now
231    if sleep_seconds < 0
232      target_next_start = now + target_seconds_between_rpcs
233    else
234      target_next_start += target_seconds_between_rpcs
235      sleep(sleep_seconds)
236    end
237    deadline_sec = $rpc_config.timeout_sec > 0 ? $rpc_config.timeout_sec : 30
238    deadline = GRPC::Core::TimeConsts::from_relative_time(deadline_sec)
239    results = {}
240    $rpc_config.rpcs_to_send.each do |rpc|
241      # rpc is in the form of :UNARY_CALL or :EMPTY_CALL here
242      metadata = $rpc_config.metadata_to_send.key?(rpc) ?
243                   $rpc_config.metadata_to_send[rpc] : {}
244      $accumulated_stats_mu.synchronize do
245        $num_rpcs_started_by_method[rpc.to_s] += 1
246        $accumulated_method_stats[rpc.to_s].increment_rpcs_started()
247      end
248      if rpc == :UNARY_CALL
249        op = stub.unary_call(simple_req,
250                             metadata: metadata,
251                             deadline: deadline,
252                             return_op: true)
253      elsif rpc == :EMPTY_CALL
254        op = stub.empty_call(empty_req,
255                             metadata: metadata,
256                             deadline: deadline,
257                             return_op: true)
258      else
259        raise "Unsupported rpc #{rpc}"
260      end
261      keep_open_threads << execute_rpc_in_thread(op, rpc)
262    end
263    # collect results from threads
264    $thread_results_mu.synchronize do
265      $thread_results.each do |r|
266        rpc_name, remote_peer = r
267        results[rpc_name] = remote_peer
268      end
269      $thread_results = Array.new
270    end
271    $watchers_mutex.synchronize do
272      $watchers.each do |watcher|
273        # this is counted once when each group of all rpcs_to_send were done
274        watcher['rpcs_needed'] -= 1
275        results.each do |rpc_name, remote_peer|
276          # These stats expect rpc_name to be in the form of
277          # UnaryCall or EmptyCall, not the underscore-case all-caps form
278          rpc_name = $RPC_MAP.invert()[rpc_name]
279          if remote_peer.strip.empty?
280            # error is counted per individual RPC
281            watcher['no_remote_peer'] += 1
282          else
283            if not watcher['rpcs_by_method'].key?(rpc_name)
284              watcher['rpcs_by_method'][rpc_name] = Hash.new(0)
285            end
286            # increment the remote hostname distribution histogram
287            # both by overall, and broken down per RPC
288            watcher['rpcs_by_method'][rpc_name][remote_peer] +=  1
289            watcher['rpcs_by_peer'][remote_peer] += 1
290          end
291        end
292      end
293      $watchers_cv.broadcast
294    end
295  end
296  keep_open_threads.each { |thd| thd.join }
297end
298
299# Args is used to hold the command line info.
300Args = Struct.new(:fail_on_failed_rpcs, :num_channels,
301                  :rpc, :metadata,
302                  :server, :stats_port, :qps)
303
304# validates the command line options, returning them as a Hash.
305def parse_args
306  args = Args.new
307  args['fail_on_failed_rpcs'] = false
308  args['num_channels'] = 1
309  args['rpc'] = 'UnaryCall'
310  args['metadata'] = ''
311  OptionParser.new do |opts|
312    opts.on('--fail_on_failed_rpcs BOOL', ['false', 'true']) do |v|
313      args['fail_on_failed_rpcs'] = v == 'true'
314    end
315    opts.on('--num_channels CHANNELS', 'number of channels') do |v|
316      args['num_channels'] = v.to_i
317    end
318    opts.on('--rpc RPCS_TO_SEND', 'list of RPCs to send') do |v|
319      args['rpc'] = v
320    end
321    opts.on('--metadata METADATA_TO_SEND', 'metadata to send per RPC') do |v|
322      args['metadata'] = v
323    end
324    opts.on('--server SERVER_HOST', 'server hostname') do |v|
325      GRPC.logger.info("ruby xds: server address is #{v}")
326      args['server'] = v
327    end
328    opts.on('--stats_port STATS_PORT', 'stats port') do |v|
329      GRPC.logger.info("ruby xds: stats port is #{v}")
330      args['stats_port'] = v
331    end
332    opts.on('--qps QPS', 'qps') do |v|
333      GRPC.logger.info("ruby xds: qps is #{v}")
334      args['qps'] = v
335    end
336  end.parse!
337  args
338end
339
340def main
341  opts = parse_args
342
343  # This server hosts the LoadBalancerStatsService
344  host = "0.0.0.0:#{opts['stats_port']}"
345  s = GRPC::RpcServer.new
346  s.add_http2_port(host, :this_port_is_insecure)
347  s.handle(TestTarget)
348  s.handle(ConfigureTarget)
349  server_thread = Thread.new {
350    # run the server until the main test runner terminates this process
351    s.run_till_terminated_or_interrupted(['TERM'])
352  }
353
354  # Initialize stats
355  $RPC_MAP.values.each do |rpc|
356    $num_rpcs_started_by_method[rpc.to_s] = 0
357    $num_rpcs_succeeded_by_method[rpc.to_s] = 0
358    $num_rpcs_failed_by_method[rpc.to_s] = 0
359    $accumulated_method_stats[rpc.to_s] = StatsPerMethod.new
360  end
361
362  # The client just sends rpcs continuously in a regular interval
363  stub = create_stub(opts)
364  target_seconds_between_rpcs = (1.0 / opts['qps'].to_f)
365  # Convert 'metadata' input in the form of
366  #   rpc1:k1:v1,rpc2:k2:v2,rpc1:k3:v3
367  # into
368  #   {
369  #     'rpc1' => {
370  #       'k1' => 'v1',
371  #       'k3' => 'v3',
372  #     },
373  #     'rpc2' => {
374  #       'k2' => 'v2'
375  #     },
376  #   }
377  rpcs_to_send = []
378  metadata_to_send = {}
379  if opts['metadata']
380    metadata_entries = opts['metadata'].split(',')
381    metadata_entries.each do |e|
382      (rpc_name, metadata_key, metadata_value) = e.split(':')
383      rpc_name = $RPC_MAP[rpc_name]
384      # initialize if we haven't seen this rpc_name yet
385      if !metadata_to_send.key?(rpc_name)
386        metadata_to_send[rpc_name] = {}
387      end
388      metadata_to_send[rpc_name][metadata_key] = metadata_value
389    end
390  end
391  if opts['rpc']
392    rpcs_to_send = opts['rpc'].split(',')
393  end
394  if rpcs_to_send.size > 0
395    rpcs_to_send.map! { |rpc| $RPC_MAP[rpc] }
396    new_rpc_config = RpcConfig.new
397    new_rpc_config.init(rpcs_to_send, metadata_to_send)
398    $rpc_config = new_rpc_config
399  end
400  client_threads = Array.new
401  opts['num_channels'].times {
402    client_threads << Thread.new {
403      run_test_loop(stub, target_seconds_between_rpcs,
404                    opts['fail_on_failed_rpcs'])
405    }
406  }
407
408  server_thread.join
409  $shutdown = true
410  client_threads.each { |thd| thd.join }
411end
412
413if __FILE__ == $0
414  main
415end
416