xref: /aosp_15_r20/external/autotest/utils/frozen_chromite/third_party/googleapiclient/channel.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1"""Channel notifications support.
2
3Classes and functions to support channel subscriptions and notifications
4on those channels.
5
6Notes:
7  - This code is based on experimental APIs and is subject to change.
8  - Notification does not do deduplication of notification ids, that's up to
9    the receiver.
10  - Storing the Channel between calls is up to the caller.
11
12
13Example setting up a channel:
14
15  # Create a new channel that gets notifications via webhook.
16  channel = new_webhook_channel("https://example.com/my_web_hook")
17
18  # Store the channel, keyed by 'channel.id'. Store it before calling the
19  # watch method because notifications may start arriving before the watch
20  # method returns.
21  ...
22
23  resp = service.objects().watchAll(
24    bucket="some_bucket_id", body=channel.body()).execute()
25  channel.update(resp)
26
27  # Store the channel, keyed by 'channel.id'. Store it after being updated
28  # since the resource_id value will now be correct, and that's needed to
29  # stop a subscription.
30  ...
31
32
33An example Webhook implementation using webapp2. Note that webapp2 puts
34headers in a case insensitive dictionary, as headers aren't guaranteed to
35always be upper case.
36
37  id = self.request.headers[X_GOOG_CHANNEL_ID]
38
39  # Retrieve the channel by id.
40  channel = ...
41
42  # Parse notification from the headers, including validating the id.
43  n = notification_from_headers(channel, self.request.headers)
44
45  # Do app specific stuff with the notification here.
46  if n.resource_state == 'sync':
47    # Code to handle sync state.
48  elif n.resource_state == 'exists':
49    # Code to handle the exists state.
50  elif n.resource_state == 'not_exists':
51    # Code to handle the not exists state.
52
53
54Example of unsubscribing.
55
56  service.channels().stop(channel.body())
57"""
58from __future__ import absolute_import
59
60import datetime
61import uuid
62
63from googleapiclient import errors
64import six
65
66# Oauth2client < 3 has the positional helper in 'util', >= 3 has it
67# in '_helpers'.
68try:
69  from oauth2client import util
70except ImportError:
71  from oauth2client import _helpers as util
72
73
74# The unix time epoch starts at midnight 1970.
75EPOCH = datetime.datetime.utcfromtimestamp(0)
76
77# Map the names of the parameters in the JSON channel description to
78# the parameter names we use in the Channel class.
79CHANNEL_PARAMS = {
80    'address': 'address',
81    'id': 'id',
82    'expiration': 'expiration',
83    'params': 'params',
84    'resourceId': 'resource_id',
85    'resourceUri': 'resource_uri',
86    'type': 'type',
87    'token': 'token',
88    }
89
90X_GOOG_CHANNEL_ID     = 'X-GOOG-CHANNEL-ID'
91X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER'
92X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE'
93X_GOOG_RESOURCE_URI   = 'X-GOOG-RESOURCE-URI'
94X_GOOG_RESOURCE_ID    = 'X-GOOG-RESOURCE-ID'
95
96
97def _upper_header_keys(headers):
98  new_headers = {}
99  for k, v in six.iteritems(headers):
100    new_headers[k.upper()] = v
101  return new_headers
102
103
104class Notification(object):
105  """A Notification from a Channel.
106
107  Notifications are not usually constructed directly, but are returned
108  from functions like notification_from_headers().
109
110  Attributes:
111    message_number: int, The unique id number of this notification.
112    state: str, The state of the resource being monitored.
113    uri: str, The address of the resource being monitored.
114    resource_id: str, The unique identifier of the version of the resource at
115      this event.
116  """
117  @util.positional(5)
118  def __init__(self, message_number, state, resource_uri, resource_id):
119    """Notification constructor.
120
121    Args:
122      message_number: int, The unique id number of this notification.
123      state: str, The state of the resource being monitored. Can be one
124        of "exists", "not_exists", or "sync".
125      resource_uri: str, The address of the resource being monitored.
126      resource_id: str, The identifier of the watched resource.
127    """
128    self.message_number = message_number
129    self.state = state
130    self.resource_uri = resource_uri
131    self.resource_id = resource_id
132
133
134class Channel(object):
135  """A Channel for notifications.
136
137  Usually not constructed directly, instead it is returned from helper
138  functions like new_webhook_channel().
139
140  Attributes:
141    type: str, The type of delivery mechanism used by this channel. For
142      example, 'web_hook'.
143    id: str, A UUID for the channel.
144    token: str, An arbitrary string associated with the channel that
145      is delivered to the target address with each event delivered
146      over this channel.
147    address: str, The address of the receiving entity where events are
148      delivered. Specific to the channel type.
149    expiration: int, The time, in milliseconds from the epoch, when this
150      channel will expire.
151    params: dict, A dictionary of string to string, with additional parameters
152      controlling delivery channel behavior.
153    resource_id: str, An opaque id that identifies the resource that is
154      being watched. Stable across different API versions.
155    resource_uri: str, The canonicalized ID of the watched resource.
156  """
157
158  @util.positional(5)
159  def __init__(self, type, id, token, address, expiration=None,
160               params=None, resource_id="", resource_uri=""):
161    """Create a new Channel.
162
163    In user code, this Channel constructor will not typically be called
164    manually since there are functions for creating channels for each specific
165    type with a more customized set of arguments to pass.
166
167    Args:
168      type: str, The type of delivery mechanism used by this channel. For
169        example, 'web_hook'.
170      id: str, A UUID for the channel.
171      token: str, An arbitrary string associated with the channel that
172        is delivered to the target address with each event delivered
173        over this channel.
174      address: str,  The address of the receiving entity where events are
175        delivered. Specific to the channel type.
176      expiration: int, The time, in milliseconds from the epoch, when this
177        channel will expire.
178      params: dict, A dictionary of string to string, with additional parameters
179        controlling delivery channel behavior.
180      resource_id: str, An opaque id that identifies the resource that is
181        being watched. Stable across different API versions.
182      resource_uri: str, The canonicalized ID of the watched resource.
183    """
184    self.type = type
185    self.id = id
186    self.token = token
187    self.address = address
188    self.expiration = expiration
189    self.params = params
190    self.resource_id = resource_id
191    self.resource_uri = resource_uri
192
193  def body(self):
194    """Build a body from the Channel.
195
196    Constructs a dictionary that's appropriate for passing into watch()
197    methods as the value of body argument.
198
199    Returns:
200      A dictionary representation of the channel.
201    """
202    result = {
203        'id': self.id,
204        'token': self.token,
205        'type': self.type,
206        'address': self.address
207        }
208    if self.params:
209      result['params'] = self.params
210    if self.resource_id:
211      result['resourceId'] = self.resource_id
212    if self.resource_uri:
213      result['resourceUri'] = self.resource_uri
214    if self.expiration:
215      result['expiration'] = self.expiration
216
217    return result
218
219  def update(self, resp):
220    """Update a channel with information from the response of watch().
221
222    When a request is sent to watch() a resource, the response returned
223    from the watch() request is a dictionary with updated channel information,
224    such as the resource_id, which is needed when stopping a subscription.
225
226    Args:
227      resp: dict, The response from a watch() method.
228    """
229    for json_name, param_name in six.iteritems(CHANNEL_PARAMS):
230      value = resp.get(json_name)
231      if value is not None:
232        setattr(self, param_name, value)
233
234
235def notification_from_headers(channel, headers):
236  """Parse a notification from the webhook request headers, validate
237    the notification, and return a Notification object.
238
239  Args:
240    channel: Channel, The channel that the notification is associated with.
241    headers: dict, A dictionary like object that contains the request headers
242      from the webhook HTTP request.
243
244  Returns:
245    A Notification object.
246
247  Raises:
248    errors.InvalidNotificationError if the notification is invalid.
249    ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int.
250  """
251  headers = _upper_header_keys(headers)
252  channel_id = headers[X_GOOG_CHANNEL_ID]
253  if channel.id != channel_id:
254    raise errors.InvalidNotificationError(
255        'Channel id mismatch: %s != %s' % (channel.id, channel_id))
256  else:
257    message_number = int(headers[X_GOOG_MESSAGE_NUMBER])
258    state = headers[X_GOOG_RESOURCE_STATE]
259    resource_uri = headers[X_GOOG_RESOURCE_URI]
260    resource_id = headers[X_GOOG_RESOURCE_ID]
261    return Notification(message_number, state, resource_uri, resource_id)
262
263
264@util.positional(2)
265def new_webhook_channel(url, token=None, expiration=None, params=None):
266    """Create a new webhook Channel.
267
268    Args:
269      url: str, URL to post notifications to.
270      token: str, An arbitrary string associated with the channel that
271        is delivered to the target address with each notification delivered
272        over this channel.
273      expiration: datetime.datetime, A time in the future when the channel
274        should expire. Can also be None if the subscription should use the
275        default expiration. Note that different services may have different
276        limits on how long a subscription lasts. Check the response from the
277        watch() method to see the value the service has set for an expiration
278        time.
279      params: dict, Extra parameters to pass on channel creation. Currently
280        not used for webhook channels.
281    """
282    expiration_ms = 0
283    if expiration:
284      delta = expiration - EPOCH
285      expiration_ms = delta.microseconds/1000 + (
286          delta.seconds + delta.days*24*3600)*1000
287      if expiration_ms < 0:
288        expiration_ms = 0
289
290    return Channel('web_hook', str(uuid.uuid4()),
291                   token, url, expiration=expiration_ms,
292                   params=params)
293
294