xref: /aosp_15_r20/external/federated-compute/fcp/demo/eligibility_eval_tasks_test.py (revision 14675a029014e728ec732f129a32e299b2da0601)
1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either expresus or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests for eligibility_eval_tasks."""
15
16import datetime
17import http
18from unittest import mock
19import uuid
20
21from absl.testing import absltest
22
23from google.rpc import code_pb2
24from fcp.demo import eligibility_eval_tasks
25from fcp.demo import http_actions
26from fcp.protos.federatedcompute import common_pb2
27from fcp.protos.federatedcompute import eligibility_eval_tasks_pb2
28
29_TaskAssignmentMode = (
30    eligibility_eval_tasks_pb2.PopulationEligibilitySpec.TaskInfo.TaskAssignmentMode
31)
32
33POPULATION_NAME = 'test/population'
34FORWARDING_INFO = common_pb2.ForwardingInfo(
35    target_uri_prefix='https://forwarding.example/')
36
37
38class EligibilityEvalTasksTest(absltest.TestCase):
39
40  @mock.patch.object(uuid, 'uuid4', return_value=uuid.uuid4(), autospec=True)
41  def test_request_eligibility_eval_task(self, mock_uuid):
42    service = eligibility_eval_tasks.Service(POPULATION_NAME,
43                                             lambda: FORWARDING_INFO)
44    service.add_task('task1', _TaskAssignmentMode.TASK_ASSIGNMENT_MODE_SINGLE)
45    request = eligibility_eval_tasks_pb2.EligibilityEvalTaskRequest(
46        population_name=POPULATION_NAME)
47    retry_window = common_pb2.RetryWindow()
48    retry_window.delay_min.FromTimedelta(datetime.timedelta(seconds=10))
49    retry_window.delay_max.FromTimedelta(datetime.timedelta(seconds=30))
50    self.assertEqual(
51        service.request_eligibility_eval_task(request),
52        eligibility_eval_tasks_pb2.EligibilityEvalTaskResponse(
53            session_id=str(mock_uuid.return_value),
54            task_assignment_forwarding_info=FORWARDING_INFO,
55            no_eligibility_eval_configured=(
56                eligibility_eval_tasks_pb2.NoEligibilityEvalConfigured()),
57            retry_window_if_accepted=retry_window,
58            retry_window_if_rejected=retry_window))
59
60  def test_request_eligibility_eval_task_with_multiple_assignment(self):
61    service = eligibility_eval_tasks.Service(
62        POPULATION_NAME, lambda: FORWARDING_INFO
63    )
64    service.add_task('task1', _TaskAssignmentMode.TASK_ASSIGNMENT_MODE_SINGLE)
65    service.add_task('task2', _TaskAssignmentMode.TASK_ASSIGNMENT_MODE_MULTIPLE)
66
67    request = eligibility_eval_tasks_pb2.EligibilityEvalTaskRequest(
68        population_name=POPULATION_NAME,
69        eligibility_eval_task_capabilities=(
70            eligibility_eval_tasks_pb2.EligibilityEvalTaskCapabilities(
71                supports_multiple_task_assignment=True
72            )
73        ),
74    )
75    response = service.request_eligibility_eval_task(request)
76    self.assertTrue(response.HasField('eligibility_eval_task'))
77    spec_resource = response.eligibility_eval_task.population_eligibility_spec
78    population_eligibility_spec = (
79        eligibility_eval_tasks_pb2.PopulationEligibilitySpec.FromString(
80            spec_resource.inline_resource.data
81        )
82    )
83    self.assertEqual(
84        population_eligibility_spec,
85        eligibility_eval_tasks_pb2.PopulationEligibilitySpec(
86            task_info=[
87                eligibility_eval_tasks_pb2.PopulationEligibilitySpec.TaskInfo(
88                    task_name='task1',
89                    task_assignment_mode=(
90                        _TaskAssignmentMode.TASK_ASSIGNMENT_MODE_SINGLE
91                    ),
92                ),
93                eligibility_eval_tasks_pb2.PopulationEligibilitySpec.TaskInfo(
94                    task_name='task2',
95                    task_assignment_mode=(
96                        _TaskAssignmentMode.TASK_ASSIGNMENT_MODE_MULTIPLE
97                    ),
98                ),
99            ],
100        ),
101    )
102
103  def test_request_eligibility_eval_task_with_wrong_population(self):
104    service = eligibility_eval_tasks.Service(POPULATION_NAME,
105                                             lambda: FORWARDING_INFO)
106    request = eligibility_eval_tasks_pb2.EligibilityEvalTaskRequest(
107        population_name='other/population')
108    with self.assertRaises(http_actions.HttpError) as cm:
109      service.request_eligibility_eval_task(request)
110    self.assertEqual(cm.exception.code, http.HTTPStatus.NOT_FOUND)
111
112  def test_report_eligibility_eval_task_result(self):
113    service = eligibility_eval_tasks.Service(POPULATION_NAME,
114                                             lambda: FORWARDING_INFO)
115    request = eligibility_eval_tasks_pb2.ReportEligibilityEvalTaskResultRequest(
116        population_name=POPULATION_NAME,
117        session_id='session-id',
118        status_code=code_pb2.ABORTED)
119    self.assertEqual(
120        service.report_eligibility_eval_task_result(request),
121        eligibility_eval_tasks_pb2.ReportEligibilityEvalTaskResultResponse())
122
123  def test_report_eligibility_eval_task_result_with_wrong_population(self):
124    service = eligibility_eval_tasks.Service(POPULATION_NAME,
125                                             lambda: FORWARDING_INFO)
126    request = eligibility_eval_tasks_pb2.ReportEligibilityEvalTaskResultRequest(
127        population_name='other/population',
128        session_id='session-id',
129        status_code=code_pb2.ABORTED)
130    with self.assertRaises(http_actions.HttpError) as cm:
131      service.report_eligibility_eval_task_result(request)
132    self.assertEqual(cm.exception.code, http.HTTPStatus.NOT_FOUND)
133
134  def test_remove_task(self):
135    service = eligibility_eval_tasks.Service(
136        POPULATION_NAME, lambda: FORWARDING_INFO
137    )
138    service.add_task('task', _TaskAssignmentMode.TASK_ASSIGNMENT_MODE_SINGLE)
139    service.remove_task('task')
140    request = eligibility_eval_tasks_pb2.EligibilityEvalTaskRequest(
141        population_name=POPULATION_NAME,
142        eligibility_eval_task_capabilities=(
143            eligibility_eval_tasks_pb2.EligibilityEvalTaskCapabilities(
144                supports_multiple_task_assignment=True
145            )
146        ),
147    )
148    response = service.request_eligibility_eval_task(request)
149    spec_resource = response.eligibility_eval_task.population_eligibility_spec
150    population_eligibility_spec = (
151        eligibility_eval_tasks_pb2.PopulationEligibilitySpec.FromString(
152            spec_resource.inline_resource.data
153        )
154    )
155    self.assertEqual(
156        population_eligibility_spec,
157        eligibility_eval_tasks_pb2.PopulationEligibilitySpec(),
158    )
159
160  def test_remove_missing_task(self):
161    service = eligibility_eval_tasks.Service(
162        POPULATION_NAME, lambda: FORWARDING_INFO
163    )
164    with self.assertRaises(KeyError):
165      service.remove_task('does-not-exist')
166
167
168if __name__ == '__main__':
169  absltest.main()
170