xref: /aosp_15_r20/external/autotest/site_utils/pubsub_utils_unittest.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1#!/usr/bin/env python3
2# Copyright 2016 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Unit test for pubsub_utils.py"""
7
8from __future__ import print_function
9import os
10import unittest
11from unittest.mock import patch
12from unittest.mock import MagicMock
13
14import common
15
16# TODO(crbug.com/1050892): The unittests rely on apiclient in chromite.
17import autotest_lib.utils.frozen_chromite  # pylint: disable=unused-import
18
19from apiclient import discovery
20from oauth2client.client import ApplicationDefaultCredentialsError
21from oauth2client.client import GoogleCredentials
22from googleapiclient.errors import UnknownApiNameOrVersion
23
24from autotest_lib.site_utils import pubsub_utils
25
26_TEST_CLOUD_SERVICE_ACCOUNT_FILE = '/tmp/test-credential'
27
28
29class MockedPubSub(object):
30    """A mocked PubSub handle."""
31    def __init__(self, test, topic, msg, retry, ret_val=None,
32                 raise_except=False):
33        self.test = test
34        self.topic = topic
35        self.msg = msg
36        self.retry = retry
37        self.ret_val = ret_val
38        self.raise_except = raise_except
39
40    def projects(self):
41        """Mocked PubSub projects."""
42        return self
43
44    def topics(self):
45        """Mocked PubSub topics."""
46        return self
47
48    def publish(self, topic, body):
49        """Mocked PubSub publish method.
50
51        @param topic: PubSub topic string.
52        @param body: PubSub notification body.
53        """
54        self.test.assertEquals(self.topic, topic)
55        self.test.assertEquals(self.msg, body['messages'][0])
56        return self
57
58    def execute(self, num_retries):
59        """Mocked PubSub execute method.
60
61        @param num_retries: Number of retries.
62        """
63        self.test.assertEquals(self.retry, num_retries)
64        if self.raise_except:
65            raise Exception()
66        return self.ret_val
67
68
69def _create_sample_message():
70    """Creates a sample pubsub message."""
71    msg_payload = {'data': 'sample data'}
72    msg_attributes = {}
73    msg_attributes['var'] = 'value'
74    msg_payload['attributes'] = msg_attributes
75
76    return msg_payload
77
78
79class PubSubTests(unittest.TestCase):
80    """Tests for pubsub related functios."""
81
82    def setUp(self):
83        patcher = patch.object(os.path, 'isfile')
84        self.isfile_mock = patcher.start()
85        self.addCleanup(patcher.stop)
86        creds_patcher = patch.object(GoogleCredentials, 'from_stream')
87        self.creds_mock = creds_patcher.start()
88        self.addCleanup(creds_patcher.stop)
89
90    def test_pubsub_with_no_service_account(self):
91        """Test getting the pubsub service"""
92        with self.assertRaises(pubsub_utils.PubSubException):
93            pubsub_utils.PubSubClient()
94
95    def test_pubsub_with_non_existing_service_account(self):
96        """Test getting the pubsub service"""
97        self.isfile_mock.return_value = False
98        with self.assertRaises(pubsub_utils.PubSubException):
99            pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
100        self.isfile_mock.assert_called_with(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
101
102    def test_pubsub_with_corrupted_service_account(self):
103        """Test pubsub with corrupted service account."""
104
105        self.isfile_mock.return_value = True
106        self.creds_mock.side_effect = ApplicationDefaultCredentialsError
107
108        with self.assertRaises(pubsub_utils.PubSubException):
109            pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
110
111        self.creds_mock.assert_called_with(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
112        self.isfile_mock.assert_called_with(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
113
114    def test_pubsub_with_invalid_service_account(self):
115        """Test pubsubwith invalid service account."""
116        self.isfile_mock.return_value = True
117        credentials = MagicMock(GoogleCredentials)
118        self.creds_mock.return_value = credentials
119
120        credentials.create_scoped_required.return_value = True
121        credentials.create_scoped.return_value = credentials
122
123        with patch.object(discovery, 'build') as discovery_mock:
124            discovery_mock.side_effect = UnknownApiNameOrVersion
125
126            with self.assertRaises(pubsub_utils.PubSubException):
127                msg = _create_sample_message()
128                pubsub_client = pubsub_utils.PubSubClient(
129                        _TEST_CLOUD_SERVICE_ACCOUNT_FILE)
130                pubsub_client.publish_notifications('test_topic', [msg])
131
132            credentials.create_scoped.assert_called_with(
133                    pubsub_utils.PUBSUB_SCOPES)
134            discovery_mock.assert_called_with(pubsub_utils.PUBSUB_SERVICE_NAME,
135                                              pubsub_utils.PUBSUB_VERSION,
136                                              credentials=credentials)
137        self.creds_mock.assert_called_with(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
138        self.isfile_mock.assert_called_with(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
139
140    def test_publish_notifications(self):
141        """Test getting the pubsub service"""
142        self.isfile_mock.return_value = True
143        credentials = MagicMock(GoogleCredentials)
144        self.creds_mock.return_value = credentials
145
146        credentials.create_scoped_required.return_value = True
147        credentials.create_scoped.return_value = credentials
148
149        with patch.object(discovery, 'build') as discovery_mock:
150            msg = _create_sample_message()
151            discovery_mock.return_value = MockedPubSub(
152                    self,
153                    'test_topic',
154                    msg,
155                    pubsub_utils.DEFAULT_PUBSUB_NUM_RETRIES,
156                    # use tuple ('123') instead of list just for easy to
157                    # write the test.
158                    ret_val={'messageIds': ('123')})
159
160            pubsub_client = pubsub_utils.PubSubClient(
161                    _TEST_CLOUD_SERVICE_ACCOUNT_FILE)
162            msg_ids = pubsub_client.publish_notifications('test_topic', [msg])
163            self.assertEquals(('123'), msg_ids)
164
165            credentials.create_scoped.assert_called_with(
166                    pubsub_utils.PUBSUB_SCOPES)
167            discovery_mock.assert_called_with(pubsub_utils.PUBSUB_SERVICE_NAME,
168                                              pubsub_utils.PUBSUB_VERSION,
169                                              credentials=credentials)
170        self.creds_mock.assert_called_with(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
171        self.isfile_mock.assert_called_with(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
172
173
174if __name__ == '__main__':
175    unittest.main()
176