1#!/usr/bin/env python3 2# Copyright 2016 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Unit test for pubsub_utils.py""" 7 8from __future__ import print_function 9import os 10import unittest 11from unittest.mock import patch 12from unittest.mock import MagicMock 13 14import common 15 16# TODO(crbug.com/1050892): The unittests rely on apiclient in chromite. 17import autotest_lib.utils.frozen_chromite # pylint: disable=unused-import 18 19from apiclient import discovery 20from oauth2client.client import ApplicationDefaultCredentialsError 21from oauth2client.client import GoogleCredentials 22from googleapiclient.errors import UnknownApiNameOrVersion 23 24from autotest_lib.site_utils import pubsub_utils 25 26_TEST_CLOUD_SERVICE_ACCOUNT_FILE = '/tmp/test-credential' 27 28 29class MockedPubSub(object): 30 """A mocked PubSub handle.""" 31 def __init__(self, test, topic, msg, retry, ret_val=None, 32 raise_except=False): 33 self.test = test 34 self.topic = topic 35 self.msg = msg 36 self.retry = retry 37 self.ret_val = ret_val 38 self.raise_except = raise_except 39 40 def projects(self): 41 """Mocked PubSub projects.""" 42 return self 43 44 def topics(self): 45 """Mocked PubSub topics.""" 46 return self 47 48 def publish(self, topic, body): 49 """Mocked PubSub publish method. 50 51 @param topic: PubSub topic string. 52 @param body: PubSub notification body. 53 """ 54 self.test.assertEquals(self.topic, topic) 55 self.test.assertEquals(self.msg, body['messages'][0]) 56 return self 57 58 def execute(self, num_retries): 59 """Mocked PubSub execute method. 60 61 @param num_retries: Number of retries. 62 """ 63 self.test.assertEquals(self.retry, num_retries) 64 if self.raise_except: 65 raise Exception() 66 return self.ret_val 67 68 69def _create_sample_message(): 70 """Creates a sample pubsub message.""" 71 msg_payload = {'data': 'sample data'} 72 msg_attributes = {} 73 msg_attributes['var'] = 'value' 74 msg_payload['attributes'] = msg_attributes 75 76 return msg_payload 77 78 79class PubSubTests(unittest.TestCase): 80 """Tests for pubsub related functios.""" 81 82 def setUp(self): 83 patcher = patch.object(os.path, 'isfile') 84 self.isfile_mock = patcher.start() 85 self.addCleanup(patcher.stop) 86 creds_patcher = patch.object(GoogleCredentials, 'from_stream') 87 self.creds_mock = creds_patcher.start() 88 self.addCleanup(creds_patcher.stop) 89 90 def test_pubsub_with_no_service_account(self): 91 """Test getting the pubsub service""" 92 with self.assertRaises(pubsub_utils.PubSubException): 93 pubsub_utils.PubSubClient() 94 95 def test_pubsub_with_non_existing_service_account(self): 96 """Test getting the pubsub service""" 97 self.isfile_mock.return_value = False 98 with self.assertRaises(pubsub_utils.PubSubException): 99 pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE) 100 self.isfile_mock.assert_called_with(_TEST_CLOUD_SERVICE_ACCOUNT_FILE) 101 102 def test_pubsub_with_corrupted_service_account(self): 103 """Test pubsub with corrupted service account.""" 104 105 self.isfile_mock.return_value = True 106 self.creds_mock.side_effect = ApplicationDefaultCredentialsError 107 108 with self.assertRaises(pubsub_utils.PubSubException): 109 pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE) 110 111 self.creds_mock.assert_called_with(_TEST_CLOUD_SERVICE_ACCOUNT_FILE) 112 self.isfile_mock.assert_called_with(_TEST_CLOUD_SERVICE_ACCOUNT_FILE) 113 114 def test_pubsub_with_invalid_service_account(self): 115 """Test pubsubwith invalid service account.""" 116 self.isfile_mock.return_value = True 117 credentials = MagicMock(GoogleCredentials) 118 self.creds_mock.return_value = credentials 119 120 credentials.create_scoped_required.return_value = True 121 credentials.create_scoped.return_value = credentials 122 123 with patch.object(discovery, 'build') as discovery_mock: 124 discovery_mock.side_effect = UnknownApiNameOrVersion 125 126 with self.assertRaises(pubsub_utils.PubSubException): 127 msg = _create_sample_message() 128 pubsub_client = pubsub_utils.PubSubClient( 129 _TEST_CLOUD_SERVICE_ACCOUNT_FILE) 130 pubsub_client.publish_notifications('test_topic', [msg]) 131 132 credentials.create_scoped.assert_called_with( 133 pubsub_utils.PUBSUB_SCOPES) 134 discovery_mock.assert_called_with(pubsub_utils.PUBSUB_SERVICE_NAME, 135 pubsub_utils.PUBSUB_VERSION, 136 credentials=credentials) 137 self.creds_mock.assert_called_with(_TEST_CLOUD_SERVICE_ACCOUNT_FILE) 138 self.isfile_mock.assert_called_with(_TEST_CLOUD_SERVICE_ACCOUNT_FILE) 139 140 def test_publish_notifications(self): 141 """Test getting the pubsub service""" 142 self.isfile_mock.return_value = True 143 credentials = MagicMock(GoogleCredentials) 144 self.creds_mock.return_value = credentials 145 146 credentials.create_scoped_required.return_value = True 147 credentials.create_scoped.return_value = credentials 148 149 with patch.object(discovery, 'build') as discovery_mock: 150 msg = _create_sample_message() 151 discovery_mock.return_value = MockedPubSub( 152 self, 153 'test_topic', 154 msg, 155 pubsub_utils.DEFAULT_PUBSUB_NUM_RETRIES, 156 # use tuple ('123') instead of list just for easy to 157 # write the test. 158 ret_val={'messageIds': ('123')}) 159 160 pubsub_client = pubsub_utils.PubSubClient( 161 _TEST_CLOUD_SERVICE_ACCOUNT_FILE) 162 msg_ids = pubsub_client.publish_notifications('test_topic', [msg]) 163 self.assertEquals(('123'), msg_ids) 164 165 credentials.create_scoped.assert_called_with( 166 pubsub_utils.PUBSUB_SCOPES) 167 discovery_mock.assert_called_with(pubsub_utils.PUBSUB_SERVICE_NAME, 168 pubsub_utils.PUBSUB_VERSION, 169 credentials=credentials) 170 self.creds_mock.assert_called_with(_TEST_CLOUD_SERVICE_ACCOUNT_FILE) 171 self.isfile_mock.assert_called_with(_TEST_CLOUD_SERVICE_ACCOUNT_FILE) 172 173 174if __name__ == '__main__': 175 unittest.main() 176