xref: /aosp_15_r20/external/federated-compute/fcp/demo/eligibility_eval_tasks.py (revision 14675a029014e728ec732f129a32e299b2da0601)
1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Action handlers for the EligibilityEvalTasks service.
15
16Eligibility Eval tasks are not currently supported by this demo implementation.
17"""
18
19import dataclasses
20import datetime
21import http
22import threading
23from typing import Callable
24import uuid
25
26from absl import logging
27
28from google.rpc import code_pb2
29from fcp.demo import http_actions
30from fcp.protos.federatedcompute import common_pb2
31from fcp.protos.federatedcompute import eligibility_eval_tasks_pb2
32
33_TaskAssignmentMode = (
34    eligibility_eval_tasks_pb2.PopulationEligibilitySpec.TaskInfo.TaskAssignmentMode
35)
36
37
38@dataclasses.dataclass(frozen=True)
39class _Task:
40  task_name: str
41  task_assignment_mode: _TaskAssignmentMode
42
43
44class Service:
45  """Implements the EligibilityEvalTasks service."""
46
47  def __init__(self, population_name: str,
48               forwarding_info: Callable[[], common_pb2.ForwardingInfo]):
49    self._population_name = population_name
50    self._forwarding_info = forwarding_info
51    self._tasks: dict[str, _Task] = {}
52    self._tasks_lock = threading.Lock()
53
54  def add_task(self, task_name: str, task_assignment_mode: _TaskAssignmentMode):
55    """Informs the service that a new task has been added to the system."""
56    with self._tasks_lock:
57      self._tasks[task_name] = _Task(task_name, task_assignment_mode)
58
59  def remove_task(self, task_name: str):
60    """Informs the service that a task has been removed from the system."""
61    with self._tasks_lock:
62      del self._tasks[task_name]
63
64  @property
65  def _population_eligibility_spec(
66      self,
67  ) -> eligibility_eval_tasks_pb2.PopulationEligibilitySpec:
68    with self._tasks_lock:
69      return eligibility_eval_tasks_pb2.PopulationEligibilitySpec(
70          task_info=[
71              eligibility_eval_tasks_pb2.PopulationEligibilitySpec.TaskInfo(
72                  task_name=task.task_name,
73                  task_assignment_mode=task.task_assignment_mode,
74              )
75              for task in self._tasks.values()
76          ]
77      )
78
79  @http_actions.proto_action(
80      service='google.internal.federatedcompute.v1.EligibilityEvalTasks',
81      method='RequestEligibilityEvalTask')
82  def request_eligibility_eval_task(
83      self, request: eligibility_eval_tasks_pb2.EligibilityEvalTaskRequest
84  ) -> eligibility_eval_tasks_pb2.EligibilityEvalTaskResponse:
85    """Handles a RequestEligibilityEvalTask request."""
86    if request.population_name != self._population_name:
87      raise http_actions.HttpError(http.HTTPStatus.NOT_FOUND)
88    # NOTE: A production implementation should use
89    # `request.attestation_measurement` to verify the device is valid, e.g.
90    # using the SafetyNet Attestation API.
91    session_id = str(uuid.uuid4())
92    logging.debug('[%s] RequestEligibilityEvalTask', session_id)
93
94    # NOTE: A production implementation should vary the retry windows based on
95    # the population size and other factors, as described in TFLaS §2.3.
96    retry_window = common_pb2.RetryWindow()
97    retry_window.delay_min.FromTimedelta(datetime.timedelta(seconds=10))
98    retry_window.delay_max.FromTimedelta(datetime.timedelta(seconds=30))
99
100    response = eligibility_eval_tasks_pb2.EligibilityEvalTaskResponse(
101        task_assignment_forwarding_info=self._forwarding_info(),
102        session_id=str(uuid.uuid4()),
103        retry_window_if_accepted=retry_window,
104        retry_window_if_rejected=retry_window,
105    )
106
107    # This implementation does not support Eligibility Eval tasks. However, the
108    # EligibilityEvalTask response is also used to provide the
109    # PopulationEligibilitySpec to clients, so the service returns an
110    # EligibilityEvalTask instead of NoEligibilityEvalConfigured if the client
111    # supports multiple task assignment.
112    capabilities = request.eligibility_eval_task_capabilities
113    if capabilities.supports_multiple_task_assignment:
114      spec_resource = response.eligibility_eval_task.population_eligibility_spec
115      spec_resource.inline_resource.data = (
116          self._population_eligibility_spec.SerializeToString()
117      )
118    else:
119      response.no_eligibility_eval_configured.SetInParent()
120    return response
121
122  @http_actions.proto_action(
123      service='google.internal.federatedcompute.v1.EligibilityEvalTasks',
124      method='ReportEligibilityEvalTaskResult')
125  def report_eligibility_eval_task_result(
126      self,
127      request: eligibility_eval_tasks_pb2.ReportEligibilityEvalTaskResultRequest
128  ) -> eligibility_eval_tasks_pb2.ReportEligibilityEvalTaskResultResponse:
129    """Handles a ReportEligibilityEvalTaskResult request."""
130    if request.population_name != self._population_name:
131      raise http_actions.HttpError(http.HTTPStatus.NOT_FOUND)
132    # NOTE: A production implementation should collect and report metrics. In
133    # this implementation, we simply log the result.
134    logging.log(
135        logging.DEBUG if request.status_code == code_pb2.OK else logging.WARN,
136        '[%s] ReportEligibilityEvalTaskResult: %s', request.session_id,
137        code_pb2.Code.Name(request.status_code))
138    return eligibility_eval_tasks_pb2.ReportEligibilityEvalTaskResultResponse()
139