1#!/usr/bin/env python3
2# Copyright 2015 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Run interop (cross-language) tests in parallel."""
16
17from __future__ import print_function
18
19import argparse
20import atexit
21import itertools
22import json
23import multiprocessing
24import os
25import re
26import subprocess
27import sys
28import tempfile
29import time
30import traceback
31import uuid
32
33import six
34
35import python_utils.dockerjob as dockerjob
36import python_utils.jobset as jobset
37import python_utils.report_utils as report_utils
38
39# Docker doesn't clean up after itself, so we do it on exit.
40atexit.register(lambda: subprocess.call(['stty', 'echo']))
41
42ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
43os.chdir(ROOT)
44
45_FALLBACK_SERVER_PORT = 443
46_BALANCER_SERVER_PORT = 12000
47_BACKEND_SERVER_PORT = 8080
48
49_TEST_TIMEOUT = 30
50
51_FAKE_SERVERS_SAFENAME = 'fake_servers'
52
53# Use a name that's verified by the test certs
54_SERVICE_NAME = 'server.test.google.fr'
55
56
57class CXXLanguage:
58
59    def __init__(self):
60        self.client_cwd = '/var/local/git/grpc'
61        self.safename = 'cxx'
62
63    def client_cmd(self, args):
64        return ['bins/opt/interop_client'] + args
65
66    def global_env(self):
67        # 1) Set c-ares as the resolver, to
68        #    enable grpclb.
69        # 2) Turn on verbose logging.
70        # 3) Set the ROOTS_PATH env variable
71        #    to the test CA in order for
72        #    GoogleDefaultCredentials to be
73        #    able to use the test CA.
74        return {
75            'GRPC_DNS_RESOLVER':
76                'ares',
77            'GRPC_VERBOSITY':
78                'DEBUG',
79            'GRPC_TRACE':
80                'client_channel,glb',
81            'GRPC_DEFAULT_SSL_ROOTS_FILE_PATH':
82                '/var/local/git/grpc/src/core/tsi/test_creds/ca.pem',
83        }
84
85    def __str__(self):
86        return 'c++'
87
88
89class JavaLanguage:
90
91    def __init__(self):
92        self.client_cwd = '/var/local/git/grpc-java'
93        self.safename = str(self)
94
95    def client_cmd(self, args):
96        # Take necessary steps to import our test CA into
97        # the set of test CA's that the Java runtime of the
98        # docker container will pick up, so that
99        # Java GoogleDefaultCreds can use it.
100        pem_to_der_cmd = ('openssl x509 -outform der '
101                          '-in /external_mount/src/core/tsi/test_creds/ca.pem '
102                          '-out /tmp/test_ca.der')
103        keystore_import_cmd = (
104            'keytool -import '
105            '-keystore /usr/lib/jvm/java-8-oracle/jre/lib/security/cacerts '
106            '-file /tmp/test_ca.der '
107            '-deststorepass changeit '
108            '-noprompt')
109        return [
110            'bash', '-c',
111            ('{pem_to_der_cmd} && '
112             '{keystore_import_cmd} && '
113             './run-test-client.sh {java_client_args}').format(
114                 pem_to_der_cmd=pem_to_der_cmd,
115                 keystore_import_cmd=keystore_import_cmd,
116                 java_client_args=' '.join(args))
117        ]
118
119    def global_env(self):
120        # 1) Enable grpclb
121        # 2) Enable verbose logging
122        return {
123            'JAVA_OPTS': (
124                '-Dio.grpc.internal.DnsNameResolverProvider.enable_grpclb=true '
125                '-Djava.util.logging.config.file=/var/local/grpc_java_logging/logconf.txt'
126            )
127        }
128
129    def __str__(self):
130        return 'java'
131
132
133class GoLanguage:
134
135    def __init__(self):
136        self.client_cwd = '/go/src/google.golang.org/grpc/interop/client'
137        self.safename = str(self)
138
139    def client_cmd(self, args):
140        # Copy the test CA file into the path that
141        # the Go runtime in the docker container will use, so
142        # that Go's GoogleDefaultCredentials can use it.
143        # See https://golang.org/src/crypto/x509/root_linux.go.
144        return [
145            'bash', '-c',
146            ('cp /external_mount/src/core/tsi/test_creds/ca.pem '
147             '/etc/ssl/certs/ca-certificates.crt && '
148             '/go/bin/client {go_client_args}').format(
149                 go_client_args=' '.join(args))
150        ]
151
152    def global_env(self):
153        return {
154            'GRPC_GO_LOG_VERBOSITY_LEVEL': '3',
155            'GRPC_GO_LOG_SEVERITY_LEVEL': 'INFO'
156        }
157
158    def __str__(self):
159        return 'go'
160
161
162_LANGUAGES = {
163    'c++': CXXLanguage(),
164    'go': GoLanguage(),
165    'java': JavaLanguage(),
166}
167
168
169def docker_run_cmdline(cmdline, image, docker_args, cwd, environ=None):
170    """Wraps given cmdline array to create 'docker run' cmdline from it."""
171    # turn environ into -e docker args
172    docker_cmdline = 'docker run -i --rm=true'.split()
173    if environ:
174        for k, v in list(environ.items()):
175            docker_cmdline += ['-e', '%s=%s' % (k, v)]
176    return docker_cmdline + ['-w', cwd] + docker_args + [image] + cmdline
177
178
179def _job_kill_handler(job):
180    assert job._spec.container_name
181    dockerjob.docker_kill(job._spec.container_name)
182
183
184def transport_security_to_args(transport_security):
185    args = []
186    if transport_security == 'tls':
187        args += ['--use_tls=true']
188    elif transport_security == 'alts':
189        args += ['--use_tls=false', '--use_alts=true']
190    elif transport_security == 'insecure':
191        args += ['--use_tls=false']
192    elif transport_security == 'google_default_credentials':
193        args += ['--custom_credentials_type=google_default_credentials']
194    else:
195        print('Invalid transport security option.')
196        sys.exit(1)
197    return args
198
199
200def lb_client_interop_jobspec(language,
201                              dns_server_ip,
202                              docker_image,
203                              transport_security='tls'):
204    """Runs a gRPC client under test in a docker container"""
205    interop_only_options = [
206        '--server_host=%s' % _SERVICE_NAME,
207        '--server_port=%d' % _FALLBACK_SERVER_PORT
208    ] + transport_security_to_args(transport_security)
209    # Don't set the server host override in any client;
210    # Go and Java default to no override.
211    # We're using a DNS server so there's no need.
212    if language.safename == 'c++':
213        interop_only_options += ['--server_host_override=""']
214    # Don't set --use_test_ca; we're configuring
215    # clients to use test CA's via alternate means.
216    interop_only_options += ['--use_test_ca=false']
217    client_args = language.client_cmd(interop_only_options)
218    container_name = dockerjob.random_name('lb_interop_client_%s' %
219                                           language.safename)
220    docker_cmdline = docker_run_cmdline(
221        client_args,
222        environ=language.global_env(),
223        image=docker_image,
224        cwd=language.client_cwd,
225        docker_args=[
226            '--dns=%s' % dns_server_ip,
227            '--net=host',
228            '--name=%s' % container_name,
229            '-v',
230            '{grpc_grpc_root_dir}:/external_mount:ro'.format(
231                grpc_grpc_root_dir=ROOT),
232        ])
233    jobset.message('IDLE',
234                   'docker_cmdline:\b|%s|' % ' '.join(docker_cmdline),
235                   do_newline=True)
236    test_job = jobset.JobSpec(cmdline=docker_cmdline,
237                              shortname=('lb_interop_client:%s' % language),
238                              timeout_seconds=_TEST_TIMEOUT,
239                              kill_handler=_job_kill_handler)
240    test_job.container_name = container_name
241    return test_job
242
243
244def fallback_server_jobspec(transport_security, shortname):
245    """Create jobspec for running a fallback server"""
246    cmdline = [
247        'bin/server',
248        '--port=%d' % _FALLBACK_SERVER_PORT,
249    ] + transport_security_to_args(transport_security)
250    return grpc_server_in_docker_jobspec(server_cmdline=cmdline,
251                                         shortname=shortname)
252
253
254def backend_server_jobspec(transport_security, shortname):
255    """Create jobspec for running a backend server"""
256    cmdline = [
257        'bin/server',
258        '--port=%d' % _BACKEND_SERVER_PORT,
259    ] + transport_security_to_args(transport_security)
260    return grpc_server_in_docker_jobspec(server_cmdline=cmdline,
261                                         shortname=shortname)
262
263
264def grpclb_jobspec(transport_security, short_stream, backend_addrs, shortname):
265    """Create jobspec for running a balancer server"""
266    cmdline = [
267        'bin/fake_grpclb',
268        '--backend_addrs=%s' % ','.join(backend_addrs),
269        '--port=%d' % _BALANCER_SERVER_PORT,
270        '--short_stream=%s' % short_stream,
271        '--service_name=%s' % _SERVICE_NAME,
272    ] + transport_security_to_args(transport_security)
273    return grpc_server_in_docker_jobspec(server_cmdline=cmdline,
274                                         shortname=shortname)
275
276
277def grpc_server_in_docker_jobspec(server_cmdline, shortname):
278    container_name = dockerjob.random_name(shortname)
279    environ = {
280        'GRPC_GO_LOG_VERBOSITY_LEVEL': '3',
281        'GRPC_GO_LOG_SEVERITY_LEVEL': 'INFO ',
282    }
283    docker_cmdline = docker_run_cmdline(
284        server_cmdline,
285        cwd='/go',
286        image=docker_images.get(_FAKE_SERVERS_SAFENAME),
287        environ=environ,
288        docker_args=['--name=%s' % container_name])
289    jobset.message('IDLE',
290                   'docker_cmdline:\b|%s|' % ' '.join(docker_cmdline),
291                   do_newline=True)
292    server_job = jobset.JobSpec(cmdline=docker_cmdline,
293                                shortname=shortname,
294                                timeout_seconds=30 * 60)
295    server_job.container_name = container_name
296    return server_job
297
298
299def dns_server_in_docker_jobspec(grpclb_ips, fallback_ips, shortname,
300                                 cause_no_error_no_data_for_balancer_a_record):
301    container_name = dockerjob.random_name(shortname)
302    run_dns_server_cmdline = [
303        'python',
304        'test/cpp/naming/utils/run_dns_server_for_lb_interop_tests.py',
305        '--grpclb_ips=%s' % ','.join(grpclb_ips),
306        '--fallback_ips=%s' % ','.join(fallback_ips),
307    ]
308    if cause_no_error_no_data_for_balancer_a_record:
309        run_dns_server_cmdline.append(
310            '--cause_no_error_no_data_for_balancer_a_record')
311    docker_cmdline = docker_run_cmdline(
312        run_dns_server_cmdline,
313        cwd='/var/local/git/grpc',
314        image=docker_images.get(_FAKE_SERVERS_SAFENAME),
315        docker_args=['--name=%s' % container_name])
316    jobset.message('IDLE',
317                   'docker_cmdline:\b|%s|' % ' '.join(docker_cmdline),
318                   do_newline=True)
319    server_job = jobset.JobSpec(cmdline=docker_cmdline,
320                                shortname=shortname,
321                                timeout_seconds=30 * 60)
322    server_job.container_name = container_name
323    return server_job
324
325
326def build_interop_image_jobspec(lang_safename, basename_prefix='grpc_interop'):
327    """Creates jobspec for building interop docker image for a language"""
328    tag = '%s_%s:%s' % (basename_prefix, lang_safename, uuid.uuid4())
329    env = {
330        'INTEROP_IMAGE': tag,
331        'BASE_NAME': '%s_%s' % (basename_prefix, lang_safename),
332    }
333    build_job = jobset.JobSpec(
334        cmdline=['tools/run_tests/dockerize/build_interop_image.sh'],
335        environ=env,
336        shortname='build_docker_%s' % lang_safename,
337        timeout_seconds=30 * 60)
338    build_job.tag = tag
339    return build_job
340
341
342argp = argparse.ArgumentParser(description='Run interop tests.')
343argp.add_argument('-l',
344                  '--language',
345                  choices=['all'] + sorted(_LANGUAGES),
346                  nargs='+',
347                  default=['all'],
348                  help='Clients to run.')
349argp.add_argument('-j', '--jobs', default=multiprocessing.cpu_count(), type=int)
350argp.add_argument('-s',
351                  '--scenarios_file',
352                  default=None,
353                  type=str,
354                  help='File containing test scenarios as JSON configs.')
355argp.add_argument(
356    '-n',
357    '--scenario_name',
358    default=None,
359    type=str,
360    help=(
361        'Useful for manual runs: specify the name of '
362        'the scenario to run from scenarios_file. Run all scenarios if unset.'))
363argp.add_argument('--cxx_image_tag',
364                  default=None,
365                  type=str,
366                  help=('Setting this skips the clients docker image '
367                        'build step and runs the client from the named '
368                        'image. Only supports running a one client language.'))
369argp.add_argument('--go_image_tag',
370                  default=None,
371                  type=str,
372                  help=('Setting this skips the clients docker image build '
373                        'step and runs the client from the named image. Only '
374                        'supports running a one client language.'))
375argp.add_argument('--java_image_tag',
376                  default=None,
377                  type=str,
378                  help=('Setting this skips the clients docker image build '
379                        'step and runs the client from the named image. Only '
380                        'supports running a one client language.'))
381argp.add_argument(
382    '--servers_image_tag',
383    default=None,
384    type=str,
385    help=('Setting this skips the fake servers docker image '
386          'build step and runs the servers from the named image.'))
387argp.add_argument('--no_skips',
388                  default=False,
389                  type=bool,
390                  nargs='?',
391                  const=True,
392                  help=('Useful for manual runs. Setting this overrides test '
393                        '"skips" configured in test scenarios.'))
394argp.add_argument('--verbose',
395                  default=False,
396                  type=bool,
397                  nargs='?',
398                  const=True,
399                  help='Increase logging.')
400args = argp.parse_args()
401
402docker_images = {}
403
404build_jobs = []
405if len(args.language) and args.language[0] == 'all':
406    languages = list(_LANGUAGES.keys())
407else:
408    languages = args.language
409for lang_name in languages:
410    l = _LANGUAGES[lang_name]
411    # First check if a pre-built image was supplied, and avoid
412    # rebuilding the particular docker image if so.
413    if lang_name == 'c++' and args.cxx_image_tag:
414        docker_images[str(l.safename)] = args.cxx_image_tag
415    elif lang_name == 'go' and args.go_image_tag:
416        docker_images[str(l.safename)] = args.go_image_tag
417    elif lang_name == 'java' and args.java_image_tag:
418        docker_images[str(l.safename)] = args.java_image_tag
419    else:
420        # Build the test client in docker and save the fully
421        # built image.
422        job = build_interop_image_jobspec(l.safename)
423        build_jobs.append(job)
424        docker_images[str(l.safename)] = job.tag
425
426# First check if a pre-built image was supplied.
427if args.servers_image_tag:
428    docker_images[_FAKE_SERVERS_SAFENAME] = args.servers_image_tag
429else:
430    # Build the test servers in docker and save the fully
431    # built image.
432    job = build_interop_image_jobspec(_FAKE_SERVERS_SAFENAME,
433                                      basename_prefix='lb_interop')
434    build_jobs.append(job)
435    docker_images[_FAKE_SERVERS_SAFENAME] = job.tag
436
437if build_jobs:
438    jobset.message('START', 'Building interop docker images.', do_newline=True)
439    print('Jobs to run: \n%s\n' % '\n'.join(str(j) for j in build_jobs))
440    num_failures, _ = jobset.run(build_jobs,
441                                 newline_on_success=True,
442                                 maxjobs=args.jobs)
443    if num_failures == 0:
444        jobset.message('SUCCESS',
445                       'All docker images built successfully.',
446                       do_newline=True)
447    else:
448        jobset.message('FAILED',
449                       'Failed to build interop docker images.',
450                       do_newline=True)
451        sys.exit(1)
452
453
454def wait_until_dns_server_is_up(dns_server_ip):
455    """Probes the DNS server until it's running and safe for tests."""
456    for i in range(0, 30):
457        print('Health check: attempt to connect to DNS server over TCP.')
458        tcp_connect_subprocess = subprocess.Popen([
459            os.path.join(os.getcwd(), 'test/cpp/naming/utils/tcp_connect.py'),
460            '--server_host', dns_server_ip, '--server_port',
461            str(53), '--timeout',
462            str(1)
463        ])
464        tcp_connect_subprocess.communicate()
465        if tcp_connect_subprocess.returncode == 0:
466            print(('Health check: attempt to make an A-record '
467                   'query to DNS server.'))
468            dns_resolver_subprocess = subprocess.Popen([
469                os.path.join(os.getcwd(),
470                             'test/cpp/naming/utils/dns_resolver.py'),
471                '--qname',
472                ('health-check-local-dns-server-is-alive.'
473                 'resolver-tests.grpctestingexp'), '--server_host',
474                dns_server_ip, '--server_port',
475                str(53)
476            ],
477                                                       stdout=subprocess.PIPE)
478            dns_resolver_stdout, _ = dns_resolver_subprocess.communicate()
479            if dns_resolver_subprocess.returncode == 0:
480                if '123.123.123.123' in dns_resolver_stdout:
481                    print(('DNS server is up! '
482                           'Successfully reached it over UDP and TCP.'))
483                    return
484        time.sleep(0.1)
485    raise Exception(('Failed to reach DNS server over TCP and/or UDP. '
486                     'Exitting without running tests.'))
487
488
489def shortname(shortname_prefix, shortname, index):
490    return '%s_%s_%d' % (shortname_prefix, shortname, index)
491
492
493def run_one_scenario(scenario_config):
494    jobset.message('START', 'Run scenario: %s' % scenario_config['name'])
495    server_jobs = {}
496    server_addresses = {}
497    suppress_server_logs = True
498    try:
499        backend_addrs = []
500        fallback_ips = []
501        grpclb_ips = []
502        shortname_prefix = scenario_config['name']
503        # Start backends
504        for i in range(len(scenario_config['backend_configs'])):
505            backend_config = scenario_config['backend_configs'][i]
506            backend_shortname = shortname(shortname_prefix, 'backend_server', i)
507            backend_spec = backend_server_jobspec(
508                backend_config['transport_sec'], backend_shortname)
509            backend_job = dockerjob.DockerJob(backend_spec)
510            server_jobs[backend_shortname] = backend_job
511            backend_addrs.append(
512                '%s:%d' % (backend_job.ip_address(), _BACKEND_SERVER_PORT))
513        # Start fallbacks
514        for i in range(len(scenario_config['fallback_configs'])):
515            fallback_config = scenario_config['fallback_configs'][i]
516            fallback_shortname = shortname(shortname_prefix, 'fallback_server',
517                                           i)
518            fallback_spec = fallback_server_jobspec(
519                fallback_config['transport_sec'], fallback_shortname)
520            fallback_job = dockerjob.DockerJob(fallback_spec)
521            server_jobs[fallback_shortname] = fallback_job
522            fallback_ips.append(fallback_job.ip_address())
523        # Start balancers
524        for i in range(len(scenario_config['balancer_configs'])):
525            balancer_config = scenario_config['balancer_configs'][i]
526            grpclb_shortname = shortname(shortname_prefix, 'grpclb_server', i)
527            grpclb_spec = grpclb_jobspec(balancer_config['transport_sec'],
528                                         balancer_config['short_stream'],
529                                         backend_addrs, grpclb_shortname)
530            grpclb_job = dockerjob.DockerJob(grpclb_spec)
531            server_jobs[grpclb_shortname] = grpclb_job
532            grpclb_ips.append(grpclb_job.ip_address())
533        # Start DNS server
534        dns_server_shortname = shortname(shortname_prefix, 'dns_server', 0)
535        dns_server_spec = dns_server_in_docker_jobspec(
536            grpclb_ips, fallback_ips, dns_server_shortname,
537            scenario_config['cause_no_error_no_data_for_balancer_a_record'])
538        dns_server_job = dockerjob.DockerJob(dns_server_spec)
539        server_jobs[dns_server_shortname] = dns_server_job
540        # Get the IP address of the docker container running the DNS server.
541        # The DNS server is running on port 53 of that IP address. Note we will
542        # point the DNS resolvers of grpc clients under test to our controlled
543        # DNS server by effectively modifying the /etc/resolve.conf "nameserver"
544        # lists of their docker containers.
545        dns_server_ip = dns_server_job.ip_address()
546        wait_until_dns_server_is_up(dns_server_ip)
547        # Run clients
548        jobs = []
549        for lang_name in languages:
550            # Skip languages that are known to not currently
551            # work for this test.
552            if not args.no_skips and lang_name in scenario_config.get(
553                    'skip_langs', []):
554                jobset.message(
555                    'IDLE', 'Skipping scenario: %s for language: %s\n' %
556                    (scenario_config['name'], lang_name))
557                continue
558            lang = _LANGUAGES[lang_name]
559            test_job = lb_client_interop_jobspec(
560                lang,
561                dns_server_ip,
562                docker_image=docker_images.get(lang.safename),
563                transport_security=scenario_config['transport_sec'])
564            jobs.append(test_job)
565        jobset.message(
566            'IDLE', 'Jobs to run: \n%s\n' % '\n'.join(str(job) for job in jobs))
567        num_failures, resultset = jobset.run(jobs,
568                                             newline_on_success=True,
569                                             maxjobs=args.jobs)
570        report_utils.render_junit_xml_report(resultset, 'sponge_log.xml')
571        if num_failures:
572            suppress_server_logs = False
573            jobset.message('FAILED',
574                           'Scenario: %s. Some tests failed' %
575                           scenario_config['name'],
576                           do_newline=True)
577        else:
578            jobset.message('SUCCESS',
579                           'Scenario: %s. All tests passed' %
580                           scenario_config['name'],
581                           do_newline=True)
582        return num_failures
583    finally:
584        # Check if servers are still running.
585        for server, job in list(server_jobs.items()):
586            if not job.is_running():
587                print('Server "%s" has exited prematurely.' % server)
588        suppress_failure = suppress_server_logs and not args.verbose
589        dockerjob.finish_jobs([j for j in six.itervalues(server_jobs)],
590                              suppress_failure=suppress_failure)
591
592
593num_failures = 0
594with open(args.scenarios_file, 'r') as scenarios_input:
595    all_scenarios = json.loads(scenarios_input.read())
596    for scenario in all_scenarios:
597        if args.scenario_name:
598            if args.scenario_name != scenario['name']:
599                jobset.message('IDLE',
600                               'Skipping scenario: %s' % scenario['name'])
601                continue
602        num_failures += run_one_scenario(scenario)
603if num_failures == 0:
604    sys.exit(0)
605else:
606    sys.exit(1)
607