xref: /aosp_15_r20/external/grpc-grpc/examples/ruby/pubsub/pubsub_demo.rb (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1*cc02d7e2SAndroid Build Coastguard Worker#!/usr/bin/env ruby
2*cc02d7e2SAndroid Build Coastguard Worker
3*cc02d7e2SAndroid Build Coastguard Worker# Copyright 2015 gRPC authors.
4*cc02d7e2SAndroid Build Coastguard Worker#
5*cc02d7e2SAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License");
6*cc02d7e2SAndroid Build Coastguard Worker# you may not use this file except in compliance with the License.
7*cc02d7e2SAndroid Build Coastguard Worker# You may obtain a copy of the License at
8*cc02d7e2SAndroid Build Coastguard Worker#
9*cc02d7e2SAndroid Build Coastguard Worker#     http://www.apache.org/licenses/LICENSE-2.0
10*cc02d7e2SAndroid Build Coastguard Worker#
11*cc02d7e2SAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software
12*cc02d7e2SAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS,
13*cc02d7e2SAndroid Build Coastguard Worker# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*cc02d7e2SAndroid Build Coastguard Worker# See the License for the specific language governing permissions and
15*cc02d7e2SAndroid Build Coastguard Worker# limitations under the License.
16*cc02d7e2SAndroid Build Coastguard Worker
17*cc02d7e2SAndroid Build Coastguard Worker# pubsub_demo demos accesses the Google PubSub API via its gRPC interface
18*cc02d7e2SAndroid Build Coastguard Worker#
19*cc02d7e2SAndroid Build Coastguard Worker# $ GOOGLE_APPLICATION_CREDENTIALS=<path_to_service_account_key_file> \
20*cc02d7e2SAndroid Build Coastguard Worker#   path/to/pubsub_demo.rb \
21*cc02d7e2SAndroid Build Coastguard Worker#   [--action=<chosen_demo_action> ]
22*cc02d7e2SAndroid Build Coastguard Worker#
23*cc02d7e2SAndroid Build Coastguard Worker# There are options related to the chosen action, see #parse_args below.
24*cc02d7e2SAndroid Build Coastguard Worker# - the possible actions are given by the method names of NamedAction class
25*cc02d7e2SAndroid Build Coastguard Worker# - the default action is list_some_topics
26*cc02d7e2SAndroid Build Coastguard Worker
27*cc02d7e2SAndroid Build Coastguard Workerthis_dir = File.expand_path(File.dirname(__FILE__))
28*cc02d7e2SAndroid Build Coastguard Workerlib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib')
29*cc02d7e2SAndroid Build Coastguard Worker$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
30*cc02d7e2SAndroid Build Coastguard Worker$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
31*cc02d7e2SAndroid Build Coastguard Worker
32*cc02d7e2SAndroid Build Coastguard Workerrequire 'optparse'
33*cc02d7e2SAndroid Build Coastguard Worker
34*cc02d7e2SAndroid Build Coastguard Workerrequire 'grpc'
35*cc02d7e2SAndroid Build Coastguard Workerrequire 'googleauth'
36*cc02d7e2SAndroid Build Coastguard Workerrequire 'google/protobuf'
37*cc02d7e2SAndroid Build Coastguard Worker
38*cc02d7e2SAndroid Build Coastguard Workerrequire 'google/protobuf/empty'
39*cc02d7e2SAndroid Build Coastguard Workerrequire 'tech/pubsub/proto/pubsub'
40*cc02d7e2SAndroid Build Coastguard Workerrequire 'tech/pubsub/proto/pubsub_services'
41*cc02d7e2SAndroid Build Coastguard Worker
42*cc02d7e2SAndroid Build Coastguard Worker# creates a SSL Credentials from the production certificates.
43*cc02d7e2SAndroid Build Coastguard Workerdef ssl_creds
44*cc02d7e2SAndroid Build Coastguard Worker  GRPC::Core::ChannelCredentials.new()
45*cc02d7e2SAndroid Build Coastguard Workerend
46*cc02d7e2SAndroid Build Coastguard Worker
47*cc02d7e2SAndroid Build Coastguard Worker# Builds the metadata authentication update proc.
48*cc02d7e2SAndroid Build Coastguard Workerdef auth_proc(opts)
49*cc02d7e2SAndroid Build Coastguard Worker  auth_creds = Google::Auth.get_application_default
50*cc02d7e2SAndroid Build Coastguard Worker  return auth_creds.updater_proc
51*cc02d7e2SAndroid Build Coastguard Workerend
52*cc02d7e2SAndroid Build Coastguard Worker
53*cc02d7e2SAndroid Build Coastguard Worker# Creates a stub for accessing the publisher service.
54*cc02d7e2SAndroid Build Coastguard Workerdef publisher_stub(opts)
55*cc02d7e2SAndroid Build Coastguard Worker  address = "#{opts.host}:#{opts.port}"
56*cc02d7e2SAndroid Build Coastguard Worker  stub_clz = Tech::Pubsub::PublisherService::Stub # shorter
57*cc02d7e2SAndroid Build Coastguard Worker  GRPC.logger.info("... access PublisherService at #{address}")
58*cc02d7e2SAndroid Build Coastguard Worker  call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts))
59*cc02d7e2SAndroid Build Coastguard Worker  combined_creds = ssl_creds.compose(call_creds)
60*cc02d7e2SAndroid Build Coastguard Worker  stub_clz.new(address, creds: combined_creds,
61*cc02d7e2SAndroid Build Coastguard Worker               GRPC::Core::Channel::SSL_TARGET => opts.host)
62*cc02d7e2SAndroid Build Coastguard Workerend
63*cc02d7e2SAndroid Build Coastguard Worker
64*cc02d7e2SAndroid Build Coastguard Worker# Creates a stub for accessing the subscriber service.
65*cc02d7e2SAndroid Build Coastguard Workerdef subscriber_stub(opts)
66*cc02d7e2SAndroid Build Coastguard Worker  address = "#{opts.host}:#{opts.port}"
67*cc02d7e2SAndroid Build Coastguard Worker  stub_clz = Tech::Pubsub::SubscriberService::Stub # shorter
68*cc02d7e2SAndroid Build Coastguard Worker  GRPC.logger.info("... access SubscriberService at #{address}")
69*cc02d7e2SAndroid Build Coastguard Worker  call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts))
70*cc02d7e2SAndroid Build Coastguard Worker  combined_creds = ssl_creds.compose(call_creds)
71*cc02d7e2SAndroid Build Coastguard Worker  stub_clz.new(address, creds: combined_creds,
72*cc02d7e2SAndroid Build Coastguard Worker               GRPC::Core::Channel::SSL_TARGET => opts.host)
73*cc02d7e2SAndroid Build Coastguard Workerend
74*cc02d7e2SAndroid Build Coastguard Worker
75*cc02d7e2SAndroid Build Coastguard Worker# defines methods corresponding to each interop test case.
76*cc02d7e2SAndroid Build Coastguard Workerclass NamedActions
77*cc02d7e2SAndroid Build Coastguard Worker  include Tech::Pubsub
78*cc02d7e2SAndroid Build Coastguard Worker
79*cc02d7e2SAndroid Build Coastguard Worker  # Initializes NamedActions
80*cc02d7e2SAndroid Build Coastguard Worker  #
81*cc02d7e2SAndroid Build Coastguard Worker  # @param pub [Stub] a stub for accessing the publisher service
82*cc02d7e2SAndroid Build Coastguard Worker  # @param sub [Stub] a stub for accessing the publisher service
83*cc02d7e2SAndroid Build Coastguard Worker  # @param args [Args] provides access to the command line
84*cc02d7e2SAndroid Build Coastguard Worker  def initialize(pub, sub, args)
85*cc02d7e2SAndroid Build Coastguard Worker    @pub = pub
86*cc02d7e2SAndroid Build Coastguard Worker    @sub = sub
87*cc02d7e2SAndroid Build Coastguard Worker    @args = args
88*cc02d7e2SAndroid Build Coastguard Worker  end
89*cc02d7e2SAndroid Build Coastguard Worker
90*cc02d7e2SAndroid Build Coastguard Worker  # Removes the test topic if it exists
91*cc02d7e2SAndroid Build Coastguard Worker  def remove_topic
92*cc02d7e2SAndroid Build Coastguard Worker    name = test_topic_name
93*cc02d7e2SAndroid Build Coastguard Worker    p "... removing Topic #{name}"
94*cc02d7e2SAndroid Build Coastguard Worker    @pub.delete_topic(DeleteTopicRequest.new(topic: name))
95*cc02d7e2SAndroid Build Coastguard Worker    p "removed Topic: #{name} OK"
96*cc02d7e2SAndroid Build Coastguard Worker  rescue GRPC::BadStatus => e
97*cc02d7e2SAndroid Build Coastguard Worker    p "Could not delete a topics: rpc failed with '#{e}'"
98*cc02d7e2SAndroid Build Coastguard Worker  end
99*cc02d7e2SAndroid Build Coastguard Worker
100*cc02d7e2SAndroid Build Coastguard Worker  # Creates a test topic
101*cc02d7e2SAndroid Build Coastguard Worker  def create_topic
102*cc02d7e2SAndroid Build Coastguard Worker    name = test_topic_name
103*cc02d7e2SAndroid Build Coastguard Worker    p "... creating Topic #{name}"
104*cc02d7e2SAndroid Build Coastguard Worker    resp = @pub.create_topic(Topic.new(name: name))
105*cc02d7e2SAndroid Build Coastguard Worker    p "created Topic: #{resp.name} OK"
106*cc02d7e2SAndroid Build Coastguard Worker  rescue GRPC::BadStatus => e
107*cc02d7e2SAndroid Build Coastguard Worker    p "Could not create a topics: rpc failed with '#{e}'"
108*cc02d7e2SAndroid Build Coastguard Worker  end
109*cc02d7e2SAndroid Build Coastguard Worker
110*cc02d7e2SAndroid Build Coastguard Worker  # Lists topics in the project
111*cc02d7e2SAndroid Build Coastguard Worker  def list_some_topics
112*cc02d7e2SAndroid Build Coastguard Worker    p 'Listing topics'
113*cc02d7e2SAndroid Build Coastguard Worker    p '-------------_'
114*cc02d7e2SAndroid Build Coastguard Worker    list_project_topics.topic.each { |t| p t.name }
115*cc02d7e2SAndroid Build Coastguard Worker  rescue GRPC::BadStatus => e
116*cc02d7e2SAndroid Build Coastguard Worker    p "Could not list topics: rpc failed with '#{e}'"
117*cc02d7e2SAndroid Build Coastguard Worker  end
118*cc02d7e2SAndroid Build Coastguard Worker
119*cc02d7e2SAndroid Build Coastguard Worker  # Checks if a topics exists in a project
120*cc02d7e2SAndroid Build Coastguard Worker  def check_exists
121*cc02d7e2SAndroid Build Coastguard Worker    name = test_topic_name
122*cc02d7e2SAndroid Build Coastguard Worker    p "... checking for topic #{name}"
123*cc02d7e2SAndroid Build Coastguard Worker    exists = topic_exists?(name)
124*cc02d7e2SAndroid Build Coastguard Worker    p "#{name} is a topic" if exists
125*cc02d7e2SAndroid Build Coastguard Worker    p "#{name} is not a topic" unless exists
126*cc02d7e2SAndroid Build Coastguard Worker  rescue GRPC::BadStatus => e
127*cc02d7e2SAndroid Build Coastguard Worker    p "Could not check for a topics: rpc failed with '#{e}'"
128*cc02d7e2SAndroid Build Coastguard Worker  end
129*cc02d7e2SAndroid Build Coastguard Worker
130*cc02d7e2SAndroid Build Coastguard Worker  # Publishes some messages
131*cc02d7e2SAndroid Build Coastguard Worker  def random_pub_sub
132*cc02d7e2SAndroid Build Coastguard Worker    topic_name, sub_name = test_topic_name, test_sub_name
133*cc02d7e2SAndroid Build Coastguard Worker    create_topic_if_needed(topic_name)
134*cc02d7e2SAndroid Build Coastguard Worker    @sub.create_subscription(Subscription.new(name: sub_name,
135*cc02d7e2SAndroid Build Coastguard Worker                                              topic: topic_name))
136*cc02d7e2SAndroid Build Coastguard Worker    msg_count = rand(10..30)
137*cc02d7e2SAndroid Build Coastguard Worker    msg_count.times do |x|
138*cc02d7e2SAndroid Build Coastguard Worker      msg = PubsubMessage.new(data: "message #{x}")
139*cc02d7e2SAndroid Build Coastguard Worker      @pub.publish(PublishRequest.new(topic: topic_name, message: msg))
140*cc02d7e2SAndroid Build Coastguard Worker    end
141*cc02d7e2SAndroid Build Coastguard Worker    p "Sent #{msg_count} messages to #{topic_name}, checking for them now."
142*cc02d7e2SAndroid Build Coastguard Worker    batch = @sub.pull_batch(PullBatchRequest.new(subscription: sub_name,
143*cc02d7e2SAndroid Build Coastguard Worker                                                 max_events: msg_count))
144*cc02d7e2SAndroid Build Coastguard Worker    ack_ids = batch.pull_responses.map { |x| x.ack_id }
145*cc02d7e2SAndroid Build Coastguard Worker    p "Got #{ack_ids.size} messages; acknowledging them.."
146*cc02d7e2SAndroid Build Coastguard Worker    @sub.acknowledge(AcknowledgeRequest.new(subscription: sub_name,
147*cc02d7e2SAndroid Build Coastguard Worker                                            ack_id: ack_ids))
148*cc02d7e2SAndroid Build Coastguard Worker    p "Test messages were acknowledged OK, deleting the subscription"
149*cc02d7e2SAndroid Build Coastguard Worker    del_req = DeleteSubscriptionRequest.new(subscription: sub_name)
150*cc02d7e2SAndroid Build Coastguard Worker    @sub.delete_subscription(del_req)
151*cc02d7e2SAndroid Build Coastguard Worker  rescue GRPC::BadStatus => e
152*cc02d7e2SAndroid Build Coastguard Worker    p "Could not do random pub sub: rpc failed with '#{e}'"
153*cc02d7e2SAndroid Build Coastguard Worker  end
154*cc02d7e2SAndroid Build Coastguard Worker
155*cc02d7e2SAndroid Build Coastguard Worker  private
156*cc02d7e2SAndroid Build Coastguard Worker
157*cc02d7e2SAndroid Build Coastguard Worker  # test_topic_name is the topic name to use in this test.
158*cc02d7e2SAndroid Build Coastguard Worker  def test_topic_name
159*cc02d7e2SAndroid Build Coastguard Worker    unless @args.topic_name.nil?
160*cc02d7e2SAndroid Build Coastguard Worker      return "/topics/#{@args.project_id}/#{@args.topic_name}"
161*cc02d7e2SAndroid Build Coastguard Worker    end
162*cc02d7e2SAndroid Build Coastguard Worker    now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L')
163*cc02d7e2SAndroid Build Coastguard Worker    "/topics/#{@args.project_id}/#{ENV['USER']}-#{now_text}"
164*cc02d7e2SAndroid Build Coastguard Worker  end
165*cc02d7e2SAndroid Build Coastguard Worker
166*cc02d7e2SAndroid Build Coastguard Worker  # test_sub_name is the subscription name to use in this test.
167*cc02d7e2SAndroid Build Coastguard Worker  def test_sub_name
168*cc02d7e2SAndroid Build Coastguard Worker    unless @args.sub_name.nil?
169*cc02d7e2SAndroid Build Coastguard Worker      return "/subscriptions/#{@args.project_id}/#{@args.sub_name}"
170*cc02d7e2SAndroid Build Coastguard Worker    end
171*cc02d7e2SAndroid Build Coastguard Worker    now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L')
172*cc02d7e2SAndroid Build Coastguard Worker    "/subscriptions/#{@args.project_id}/#{ENV['USER']}-#{now_text}"
173*cc02d7e2SAndroid Build Coastguard Worker  end
174*cc02d7e2SAndroid Build Coastguard Worker
175*cc02d7e2SAndroid Build Coastguard Worker  # determines if the topic name exists
176*cc02d7e2SAndroid Build Coastguard Worker  def topic_exists?(name)
177*cc02d7e2SAndroid Build Coastguard Worker    topics = list_project_topics.topic.map { |t| t.name }
178*cc02d7e2SAndroid Build Coastguard Worker    topics.include?(name)
179*cc02d7e2SAndroid Build Coastguard Worker  end
180*cc02d7e2SAndroid Build Coastguard Worker
181*cc02d7e2SAndroid Build Coastguard Worker  def create_topic_if_needed(name)
182*cc02d7e2SAndroid Build Coastguard Worker    return if topic_exists?(name)
183*cc02d7e2SAndroid Build Coastguard Worker    @pub.create_topic(Topic.new(name: name))
184*cc02d7e2SAndroid Build Coastguard Worker  end
185*cc02d7e2SAndroid Build Coastguard Worker
186*cc02d7e2SAndroid Build Coastguard Worker  def list_project_topics
187*cc02d7e2SAndroid Build Coastguard Worker    q = "cloud.googleapis.com/project in (/projects/#{@args.project_id})"
188*cc02d7e2SAndroid Build Coastguard Worker    @pub.list_topics(ListTopicsRequest.new(query: q))
189*cc02d7e2SAndroid Build Coastguard Worker  end
190*cc02d7e2SAndroid Build Coastguard Workerend
191*cc02d7e2SAndroid Build Coastguard Worker
192*cc02d7e2SAndroid Build Coastguard Worker# Args is used to hold the command line info.
193*cc02d7e2SAndroid Build Coastguard WorkerArgs = Struct.new(:host, :port, :action, :project_id, :topic_name,
194*cc02d7e2SAndroid Build Coastguard Worker                  :sub_name)
195*cc02d7e2SAndroid Build Coastguard Worker
196*cc02d7e2SAndroid Build Coastguard Worker# validates the command line options, returning them as an Arg.
197*cc02d7e2SAndroid Build Coastguard Workerdef parse_args
198*cc02d7e2SAndroid Build Coastguard Worker  args = Args.new('pubsub-staging.googleapis.com',
199*cc02d7e2SAndroid Build Coastguard Worker                   443, 'list_some_topics', 'stoked-keyword-656')
200*cc02d7e2SAndroid Build Coastguard Worker  OptionParser.new do |opts|
201*cc02d7e2SAndroid Build Coastguard Worker    opts.on('--server_host SERVER_HOST', 'server hostname') do |v|
202*cc02d7e2SAndroid Build Coastguard Worker      args.host = v
203*cc02d7e2SAndroid Build Coastguard Worker    end
204*cc02d7e2SAndroid Build Coastguard Worker    opts.on('--server_port SERVER_PORT', 'server port') do |v|
205*cc02d7e2SAndroid Build Coastguard Worker      args.port = v
206*cc02d7e2SAndroid Build Coastguard Worker    end
207*cc02d7e2SAndroid Build Coastguard Worker
208*cc02d7e2SAndroid Build Coastguard Worker    # instance_methods(false) gives only the methods defined in that class.
209*cc02d7e2SAndroid Build Coastguard Worker    scenes = NamedActions.instance_methods(false).map { |t| t.to_s }
210*cc02d7e2SAndroid Build Coastguard Worker    scene_list = scenes.join(',')
211*cc02d7e2SAndroid Build Coastguard Worker    opts.on("--action CODE", scenes, {}, 'pick a demo action',
212*cc02d7e2SAndroid Build Coastguard Worker            "  (#{scene_list})") do |v|
213*cc02d7e2SAndroid Build Coastguard Worker      args.action = v
214*cc02d7e2SAndroid Build Coastguard Worker    end
215*cc02d7e2SAndroid Build Coastguard Worker
216*cc02d7e2SAndroid Build Coastguard Worker    # Set the remaining values.
217*cc02d7e2SAndroid Build Coastguard Worker    %w(project_id topic_name sub_name).each do |o|
218*cc02d7e2SAndroid Build Coastguard Worker      opts.on("--#{o} VALUE", "#{o}") do |v|
219*cc02d7e2SAndroid Build Coastguard Worker        args[o] = v
220*cc02d7e2SAndroid Build Coastguard Worker      end
221*cc02d7e2SAndroid Build Coastguard Worker    end
222*cc02d7e2SAndroid Build Coastguard Worker  end.parse!
223*cc02d7e2SAndroid Build Coastguard Worker  _check_args(args)
224*cc02d7e2SAndroid Build Coastguard Workerend
225*cc02d7e2SAndroid Build Coastguard Worker
226*cc02d7e2SAndroid Build Coastguard Workerdef _check_args(args)
227*cc02d7e2SAndroid Build Coastguard Worker  %w(host port action).each do |a|
228*cc02d7e2SAndroid Build Coastguard Worker    if args[a].nil?
229*cc02d7e2SAndroid Build Coastguard Worker      raise OptionParser::MissingArgument.new("please specify --#{a}")
230*cc02d7e2SAndroid Build Coastguard Worker    end
231*cc02d7e2SAndroid Build Coastguard Worker  end
232*cc02d7e2SAndroid Build Coastguard Worker  args
233*cc02d7e2SAndroid Build Coastguard Workerend
234*cc02d7e2SAndroid Build Coastguard Worker
235*cc02d7e2SAndroid Build Coastguard Workerdef main
236*cc02d7e2SAndroid Build Coastguard Worker  args = parse_args
237*cc02d7e2SAndroid Build Coastguard Worker  pub, sub = publisher_stub(args), subscriber_stub(args)
238*cc02d7e2SAndroid Build Coastguard Worker  NamedActions.new(pub, sub, args).method(args.action).call
239*cc02d7e2SAndroid Build Coastguard Workerend
240*cc02d7e2SAndroid Build Coastguard Worker
241*cc02d7e2SAndroid Build Coastguard Workermain
242