1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either expresus or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests for eligibility_eval_tasks.""" 15 16import datetime 17import http 18from unittest import mock 19import uuid 20 21from absl.testing import absltest 22 23from google.rpc import code_pb2 24from fcp.demo import eligibility_eval_tasks 25from fcp.demo import http_actions 26from fcp.protos.federatedcompute import common_pb2 27from fcp.protos.federatedcompute import eligibility_eval_tasks_pb2 28 29_TaskAssignmentMode = ( 30 eligibility_eval_tasks_pb2.PopulationEligibilitySpec.TaskInfo.TaskAssignmentMode 31) 32 33POPULATION_NAME = 'test/population' 34FORWARDING_INFO = common_pb2.ForwardingInfo( 35 target_uri_prefix='https://forwarding.example/') 36 37 38class EligibilityEvalTasksTest(absltest.TestCase): 39 40 @mock.patch.object(uuid, 'uuid4', return_value=uuid.uuid4(), autospec=True) 41 def test_request_eligibility_eval_task(self, mock_uuid): 42 service = eligibility_eval_tasks.Service(POPULATION_NAME, 43 lambda: FORWARDING_INFO) 44 service.add_task('task1', _TaskAssignmentMode.TASK_ASSIGNMENT_MODE_SINGLE) 45 request = eligibility_eval_tasks_pb2.EligibilityEvalTaskRequest( 46 population_name=POPULATION_NAME) 47 retry_window = common_pb2.RetryWindow() 48 retry_window.delay_min.FromTimedelta(datetime.timedelta(seconds=10)) 49 retry_window.delay_max.FromTimedelta(datetime.timedelta(seconds=30)) 50 self.assertEqual( 51 service.request_eligibility_eval_task(request), 52 eligibility_eval_tasks_pb2.EligibilityEvalTaskResponse( 53 session_id=str(mock_uuid.return_value), 54 task_assignment_forwarding_info=FORWARDING_INFO, 55 no_eligibility_eval_configured=( 56 eligibility_eval_tasks_pb2.NoEligibilityEvalConfigured()), 57 retry_window_if_accepted=retry_window, 58 retry_window_if_rejected=retry_window)) 59 60 def test_request_eligibility_eval_task_with_multiple_assignment(self): 61 service = eligibility_eval_tasks.Service( 62 POPULATION_NAME, lambda: FORWARDING_INFO 63 ) 64 service.add_task('task1', _TaskAssignmentMode.TASK_ASSIGNMENT_MODE_SINGLE) 65 service.add_task('task2', _TaskAssignmentMode.TASK_ASSIGNMENT_MODE_MULTIPLE) 66 67 request = eligibility_eval_tasks_pb2.EligibilityEvalTaskRequest( 68 population_name=POPULATION_NAME, 69 eligibility_eval_task_capabilities=( 70 eligibility_eval_tasks_pb2.EligibilityEvalTaskCapabilities( 71 supports_multiple_task_assignment=True 72 ) 73 ), 74 ) 75 response = service.request_eligibility_eval_task(request) 76 self.assertTrue(response.HasField('eligibility_eval_task')) 77 spec_resource = response.eligibility_eval_task.population_eligibility_spec 78 population_eligibility_spec = ( 79 eligibility_eval_tasks_pb2.PopulationEligibilitySpec.FromString( 80 spec_resource.inline_resource.data 81 ) 82 ) 83 self.assertEqual( 84 population_eligibility_spec, 85 eligibility_eval_tasks_pb2.PopulationEligibilitySpec( 86 task_info=[ 87 eligibility_eval_tasks_pb2.PopulationEligibilitySpec.TaskInfo( 88 task_name='task1', 89 task_assignment_mode=( 90 _TaskAssignmentMode.TASK_ASSIGNMENT_MODE_SINGLE 91 ), 92 ), 93 eligibility_eval_tasks_pb2.PopulationEligibilitySpec.TaskInfo( 94 task_name='task2', 95 task_assignment_mode=( 96 _TaskAssignmentMode.TASK_ASSIGNMENT_MODE_MULTIPLE 97 ), 98 ), 99 ], 100 ), 101 ) 102 103 def test_request_eligibility_eval_task_with_wrong_population(self): 104 service = eligibility_eval_tasks.Service(POPULATION_NAME, 105 lambda: FORWARDING_INFO) 106 request = eligibility_eval_tasks_pb2.EligibilityEvalTaskRequest( 107 population_name='other/population') 108 with self.assertRaises(http_actions.HttpError) as cm: 109 service.request_eligibility_eval_task(request) 110 self.assertEqual(cm.exception.code, http.HTTPStatus.NOT_FOUND) 111 112 def test_report_eligibility_eval_task_result(self): 113 service = eligibility_eval_tasks.Service(POPULATION_NAME, 114 lambda: FORWARDING_INFO) 115 request = eligibility_eval_tasks_pb2.ReportEligibilityEvalTaskResultRequest( 116 population_name=POPULATION_NAME, 117 session_id='session-id', 118 status_code=code_pb2.ABORTED) 119 self.assertEqual( 120 service.report_eligibility_eval_task_result(request), 121 eligibility_eval_tasks_pb2.ReportEligibilityEvalTaskResultResponse()) 122 123 def test_report_eligibility_eval_task_result_with_wrong_population(self): 124 service = eligibility_eval_tasks.Service(POPULATION_NAME, 125 lambda: FORWARDING_INFO) 126 request = eligibility_eval_tasks_pb2.ReportEligibilityEvalTaskResultRequest( 127 population_name='other/population', 128 session_id='session-id', 129 status_code=code_pb2.ABORTED) 130 with self.assertRaises(http_actions.HttpError) as cm: 131 service.report_eligibility_eval_task_result(request) 132 self.assertEqual(cm.exception.code, http.HTTPStatus.NOT_FOUND) 133 134 def test_remove_task(self): 135 service = eligibility_eval_tasks.Service( 136 POPULATION_NAME, lambda: FORWARDING_INFO 137 ) 138 service.add_task('task', _TaskAssignmentMode.TASK_ASSIGNMENT_MODE_SINGLE) 139 service.remove_task('task') 140 request = eligibility_eval_tasks_pb2.EligibilityEvalTaskRequest( 141 population_name=POPULATION_NAME, 142 eligibility_eval_task_capabilities=( 143 eligibility_eval_tasks_pb2.EligibilityEvalTaskCapabilities( 144 supports_multiple_task_assignment=True 145 ) 146 ), 147 ) 148 response = service.request_eligibility_eval_task(request) 149 spec_resource = response.eligibility_eval_task.population_eligibility_spec 150 population_eligibility_spec = ( 151 eligibility_eval_tasks_pb2.PopulationEligibilitySpec.FromString( 152 spec_resource.inline_resource.data 153 ) 154 ) 155 self.assertEqual( 156 population_eligibility_spec, 157 eligibility_eval_tasks_pb2.PopulationEligibilitySpec(), 158 ) 159 160 def test_remove_missing_task(self): 161 service = eligibility_eval_tasks.Service( 162 POPULATION_NAME, lambda: FORWARDING_INFO 163 ) 164 with self.assertRaises(KeyError): 165 service.remove_task('does-not-exist') 166 167 168if __name__ == '__main__': 169 absltest.main() 170