1# Copyright 2014 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Channel notifications support.
16
17Classes and functions to support channel subscriptions and notifications
18on those channels.
19
20Notes:
21  - This code is based on experimental APIs and is subject to change.
22  - Notification does not do deduplication of notification ids, that's up to
23    the receiver.
24  - Storing the Channel between calls is up to the caller.
25
26
27Example setting up a channel:
28
29  # Create a new channel that gets notifications via webhook.
30  channel = new_webhook_channel("https://example.com/my_web_hook")
31
32  # Store the channel, keyed by 'channel.id'. Store it before calling the
33  # watch method because notifications may start arriving before the watch
34  # method returns.
35  ...
36
37  resp = service.objects().watchAll(
38    bucket="some_bucket_id", body=channel.body()).execute()
39  channel.update(resp)
40
41  # Store the channel, keyed by 'channel.id'. Store it after being updated
42  # since the resource_id value will now be correct, and that's needed to
43  # stop a subscription.
44  ...
45
46
47An example Webhook implementation using webapp2. Note that webapp2 puts
48headers in a case insensitive dictionary, as headers aren't guaranteed to
49always be upper case.
50
51  id = self.request.headers[X_GOOG_CHANNEL_ID]
52
53  # Retrieve the channel by id.
54  channel = ...
55
56  # Parse notification from the headers, including validating the id.
57  n = notification_from_headers(channel, self.request.headers)
58
59  # Do app specific stuff with the notification here.
60  if n.resource_state == 'sync':
61    # Code to handle sync state.
62  elif n.resource_state == 'exists':
63    # Code to handle the exists state.
64  elif n.resource_state == 'not_exists':
65    # Code to handle the not exists state.
66
67
68Example of unsubscribing.
69
70  service.channels().stop(channel.body()).execute()
71"""
72from __future__ import absolute_import
73
74import datetime
75import uuid
76
77from googleapiclient import errors
78from googleapiclient import _helpers as util
79
80
81# The unix time epoch starts at midnight 1970.
82EPOCH = datetime.datetime.utcfromtimestamp(0)
83
84# Map the names of the parameters in the JSON channel description to
85# the parameter names we use in the Channel class.
86CHANNEL_PARAMS = {
87    "address": "address",
88    "id": "id",
89    "expiration": "expiration",
90    "params": "params",
91    "resourceId": "resource_id",
92    "resourceUri": "resource_uri",
93    "type": "type",
94    "token": "token",
95}
96
97X_GOOG_CHANNEL_ID = "X-GOOG-CHANNEL-ID"
98X_GOOG_MESSAGE_NUMBER = "X-GOOG-MESSAGE-NUMBER"
99X_GOOG_RESOURCE_STATE = "X-GOOG-RESOURCE-STATE"
100X_GOOG_RESOURCE_URI = "X-GOOG-RESOURCE-URI"
101X_GOOG_RESOURCE_ID = "X-GOOG-RESOURCE-ID"
102
103
104def _upper_header_keys(headers):
105    new_headers = {}
106    for k, v in headers.items():
107        new_headers[k.upper()] = v
108    return new_headers
109
110
111class Notification(object):
112    """A Notification from a Channel.
113
114  Notifications are not usually constructed directly, but are returned
115  from functions like notification_from_headers().
116
117  Attributes:
118    message_number: int, The unique id number of this notification.
119    state: str, The state of the resource being monitored.
120    uri: str, The address of the resource being monitored.
121    resource_id: str, The unique identifier of the version of the resource at
122      this event.
123  """
124
125    @util.positional(5)
126    def __init__(self, message_number, state, resource_uri, resource_id):
127        """Notification constructor.
128
129    Args:
130      message_number: int, The unique id number of this notification.
131      state: str, The state of the resource being monitored. Can be one
132        of "exists", "not_exists", or "sync".
133      resource_uri: str, The address of the resource being monitored.
134      resource_id: str, The identifier of the watched resource.
135    """
136        self.message_number = message_number
137        self.state = state
138        self.resource_uri = resource_uri
139        self.resource_id = resource_id
140
141
142class Channel(object):
143    """A Channel for notifications.
144
145  Usually not constructed directly, instead it is returned from helper
146  functions like new_webhook_channel().
147
148  Attributes:
149    type: str, The type of delivery mechanism used by this channel. For
150      example, 'web_hook'.
151    id: str, A UUID for the channel.
152    token: str, An arbitrary string associated with the channel that
153      is delivered to the target address with each event delivered
154      over this channel.
155    address: str, The address of the receiving entity where events are
156      delivered. Specific to the channel type.
157    expiration: int, The time, in milliseconds from the epoch, when this
158      channel will expire.
159    params: dict, A dictionary of string to string, with additional parameters
160      controlling delivery channel behavior.
161    resource_id: str, An opaque id that identifies the resource that is
162      being watched. Stable across different API versions.
163    resource_uri: str, The canonicalized ID of the watched resource.
164  """
165
166    @util.positional(5)
167    def __init__(
168        self,
169        type,
170        id,
171        token,
172        address,
173        expiration=None,
174        params=None,
175        resource_id="",
176        resource_uri="",
177    ):
178        """Create a new Channel.
179
180    In user code, this Channel constructor will not typically be called
181    manually since there are functions for creating channels for each specific
182    type with a more customized set of arguments to pass.
183
184    Args:
185      type: str, The type of delivery mechanism used by this channel. For
186        example, 'web_hook'.
187      id: str, A UUID for the channel.
188      token: str, An arbitrary string associated with the channel that
189        is delivered to the target address with each event delivered
190        over this channel.
191      address: str,  The address of the receiving entity where events are
192        delivered. Specific to the channel type.
193      expiration: int, The time, in milliseconds from the epoch, when this
194        channel will expire.
195      params: dict, A dictionary of string to string, with additional parameters
196        controlling delivery channel behavior.
197      resource_id: str, An opaque id that identifies the resource that is
198        being watched. Stable across different API versions.
199      resource_uri: str, The canonicalized ID of the watched resource.
200    """
201        self.type = type
202        self.id = id
203        self.token = token
204        self.address = address
205        self.expiration = expiration
206        self.params = params
207        self.resource_id = resource_id
208        self.resource_uri = resource_uri
209
210    def body(self):
211        """Build a body from the Channel.
212
213    Constructs a dictionary that's appropriate for passing into watch()
214    methods as the value of body argument.
215
216    Returns:
217      A dictionary representation of the channel.
218    """
219        result = {
220            "id": self.id,
221            "token": self.token,
222            "type": self.type,
223            "address": self.address,
224        }
225        if self.params:
226            result["params"] = self.params
227        if self.resource_id:
228            result["resourceId"] = self.resource_id
229        if self.resource_uri:
230            result["resourceUri"] = self.resource_uri
231        if self.expiration:
232            result["expiration"] = self.expiration
233
234        return result
235
236    def update(self, resp):
237        """Update a channel with information from the response of watch().
238
239    When a request is sent to watch() a resource, the response returned
240    from the watch() request is a dictionary with updated channel information,
241    such as the resource_id, which is needed when stopping a subscription.
242
243    Args:
244      resp: dict, The response from a watch() method.
245    """
246        for json_name, param_name in CHANNEL_PARAMS.items():
247            value = resp.get(json_name)
248            if value is not None:
249                setattr(self, param_name, value)
250
251
252def notification_from_headers(channel, headers):
253    """Parse a notification from the webhook request headers, validate
254    the notification, and return a Notification object.
255
256  Args:
257    channel: Channel, The channel that the notification is associated with.
258    headers: dict, A dictionary like object that contains the request headers
259      from the webhook HTTP request.
260
261  Returns:
262    A Notification object.
263
264  Raises:
265    errors.InvalidNotificationError if the notification is invalid.
266    ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int.
267  """
268    headers = _upper_header_keys(headers)
269    channel_id = headers[X_GOOG_CHANNEL_ID]
270    if channel.id != channel_id:
271        raise errors.InvalidNotificationError(
272            "Channel id mismatch: %s != %s" % (channel.id, channel_id)
273        )
274    else:
275        message_number = int(headers[X_GOOG_MESSAGE_NUMBER])
276        state = headers[X_GOOG_RESOURCE_STATE]
277        resource_uri = headers[X_GOOG_RESOURCE_URI]
278        resource_id = headers[X_GOOG_RESOURCE_ID]
279        return Notification(message_number, state, resource_uri, resource_id)
280
281
282@util.positional(2)
283def new_webhook_channel(url, token=None, expiration=None, params=None):
284    """Create a new webhook Channel.
285
286    Args:
287      url: str, URL to post notifications to.
288      token: str, An arbitrary string associated with the channel that
289        is delivered to the target address with each notification delivered
290        over this channel.
291      expiration: datetime.datetime, A time in the future when the channel
292        should expire. Can also be None if the subscription should use the
293        default expiration. Note that different services may have different
294        limits on how long a subscription lasts. Check the response from the
295        watch() method to see the value the service has set for an expiration
296        time.
297      params: dict, Extra parameters to pass on channel creation. Currently
298        not used for webhook channels.
299    """
300    expiration_ms = 0
301    if expiration:
302        delta = expiration - EPOCH
303        expiration_ms = (
304            delta.microseconds / 1000 + (delta.seconds + delta.days * 24 * 3600) * 1000
305        )
306        if expiration_ms < 0:
307            expiration_ms = 0
308
309    return Channel(
310        "web_hook",
311        str(uuid.uuid4()),
312        token,
313        url,
314        expiration=expiration_ms,
315        params=params,
316    )
317