1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Action handlers for the EligibilityEvalTasks service. 15 16Eligibility Eval tasks are not currently supported by this demo implementation. 17""" 18 19import dataclasses 20import datetime 21import http 22import threading 23from typing import Callable 24import uuid 25 26from absl import logging 27 28from google.rpc import code_pb2 29from fcp.demo import http_actions 30from fcp.protos.federatedcompute import common_pb2 31from fcp.protos.federatedcompute import eligibility_eval_tasks_pb2 32 33_TaskAssignmentMode = ( 34 eligibility_eval_tasks_pb2.PopulationEligibilitySpec.TaskInfo.TaskAssignmentMode 35) 36 37 38@dataclasses.dataclass(frozen=True) 39class _Task: 40 task_name: str 41 task_assignment_mode: _TaskAssignmentMode 42 43 44class Service: 45 """Implements the EligibilityEvalTasks service.""" 46 47 def __init__(self, population_name: str, 48 forwarding_info: Callable[[], common_pb2.ForwardingInfo]): 49 self._population_name = population_name 50 self._forwarding_info = forwarding_info 51 self._tasks: dict[str, _Task] = {} 52 self._tasks_lock = threading.Lock() 53 54 def add_task(self, task_name: str, task_assignment_mode: _TaskAssignmentMode): 55 """Informs the service that a new task has been added to the system.""" 56 with self._tasks_lock: 57 self._tasks[task_name] = _Task(task_name, task_assignment_mode) 58 59 def remove_task(self, task_name: str): 60 """Informs the service that a task has been removed from the system.""" 61 with self._tasks_lock: 62 del self._tasks[task_name] 63 64 @property 65 def _population_eligibility_spec( 66 self, 67 ) -> eligibility_eval_tasks_pb2.PopulationEligibilitySpec: 68 with self._tasks_lock: 69 return eligibility_eval_tasks_pb2.PopulationEligibilitySpec( 70 task_info=[ 71 eligibility_eval_tasks_pb2.PopulationEligibilitySpec.TaskInfo( 72 task_name=task.task_name, 73 task_assignment_mode=task.task_assignment_mode, 74 ) 75 for task in self._tasks.values() 76 ] 77 ) 78 79 @http_actions.proto_action( 80 service='google.internal.federatedcompute.v1.EligibilityEvalTasks', 81 method='RequestEligibilityEvalTask') 82 def request_eligibility_eval_task( 83 self, request: eligibility_eval_tasks_pb2.EligibilityEvalTaskRequest 84 ) -> eligibility_eval_tasks_pb2.EligibilityEvalTaskResponse: 85 """Handles a RequestEligibilityEvalTask request.""" 86 if request.population_name != self._population_name: 87 raise http_actions.HttpError(http.HTTPStatus.NOT_FOUND) 88 # NOTE: A production implementation should use 89 # `request.attestation_measurement` to verify the device is valid, e.g. 90 # using the SafetyNet Attestation API. 91 session_id = str(uuid.uuid4()) 92 logging.debug('[%s] RequestEligibilityEvalTask', session_id) 93 94 # NOTE: A production implementation should vary the retry windows based on 95 # the population size and other factors, as described in TFLaS §2.3. 96 retry_window = common_pb2.RetryWindow() 97 retry_window.delay_min.FromTimedelta(datetime.timedelta(seconds=10)) 98 retry_window.delay_max.FromTimedelta(datetime.timedelta(seconds=30)) 99 100 response = eligibility_eval_tasks_pb2.EligibilityEvalTaskResponse( 101 task_assignment_forwarding_info=self._forwarding_info(), 102 session_id=str(uuid.uuid4()), 103 retry_window_if_accepted=retry_window, 104 retry_window_if_rejected=retry_window, 105 ) 106 107 # This implementation does not support Eligibility Eval tasks. However, the 108 # EligibilityEvalTask response is also used to provide the 109 # PopulationEligibilitySpec to clients, so the service returns an 110 # EligibilityEvalTask instead of NoEligibilityEvalConfigured if the client 111 # supports multiple task assignment. 112 capabilities = request.eligibility_eval_task_capabilities 113 if capabilities.supports_multiple_task_assignment: 114 spec_resource = response.eligibility_eval_task.population_eligibility_spec 115 spec_resource.inline_resource.data = ( 116 self._population_eligibility_spec.SerializeToString() 117 ) 118 else: 119 response.no_eligibility_eval_configured.SetInParent() 120 return response 121 122 @http_actions.proto_action( 123 service='google.internal.federatedcompute.v1.EligibilityEvalTasks', 124 method='ReportEligibilityEvalTaskResult') 125 def report_eligibility_eval_task_result( 126 self, 127 request: eligibility_eval_tasks_pb2.ReportEligibilityEvalTaskResultRequest 128 ) -> eligibility_eval_tasks_pb2.ReportEligibilityEvalTaskResultResponse: 129 """Handles a ReportEligibilityEvalTaskResult request.""" 130 if request.population_name != self._population_name: 131 raise http_actions.HttpError(http.HTTPStatus.NOT_FOUND) 132 # NOTE: A production implementation should collect and report metrics. In 133 # this implementation, we simply log the result. 134 logging.log( 135 logging.DEBUG if request.status_code == code_pb2.OK else logging.WARN, 136 '[%s] ReportEligibilityEvalTaskResult: %s', request.session_id, 137 code_pb2.Code.Name(request.status_code)) 138 return eligibility_eval_tasks_pb2.ReportEligibilityEvalTaskResultResponse() 139