1# Copyright 2014 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Channel notifications support. 16 17Classes and functions to support channel subscriptions and notifications 18on those channels. 19 20Notes: 21 - This code is based on experimental APIs and is subject to change. 22 - Notification does not do deduplication of notification ids, that's up to 23 the receiver. 24 - Storing the Channel between calls is up to the caller. 25 26 27Example setting up a channel: 28 29 # Create a new channel that gets notifications via webhook. 30 channel = new_webhook_channel("https://example.com/my_web_hook") 31 32 # Store the channel, keyed by 'channel.id'. Store it before calling the 33 # watch method because notifications may start arriving before the watch 34 # method returns. 35 ... 36 37 resp = service.objects().watchAll( 38 bucket="some_bucket_id", body=channel.body()).execute() 39 channel.update(resp) 40 41 # Store the channel, keyed by 'channel.id'. Store it after being updated 42 # since the resource_id value will now be correct, and that's needed to 43 # stop a subscription. 44 ... 45 46 47An example Webhook implementation using webapp2. Note that webapp2 puts 48headers in a case insensitive dictionary, as headers aren't guaranteed to 49always be upper case. 50 51 id = self.request.headers[X_GOOG_CHANNEL_ID] 52 53 # Retrieve the channel by id. 54 channel = ... 55 56 # Parse notification from the headers, including validating the id. 57 n = notification_from_headers(channel, self.request.headers) 58 59 # Do app specific stuff with the notification here. 60 if n.resource_state == 'sync': 61 # Code to handle sync state. 62 elif n.resource_state == 'exists': 63 # Code to handle the exists state. 64 elif n.resource_state == 'not_exists': 65 # Code to handle the not exists state. 66 67 68Example of unsubscribing. 69 70 service.channels().stop(channel.body()).execute() 71""" 72from __future__ import absolute_import 73 74import datetime 75import uuid 76 77from googleapiclient import errors 78from googleapiclient import _helpers as util 79 80 81# The unix time epoch starts at midnight 1970. 82EPOCH = datetime.datetime.utcfromtimestamp(0) 83 84# Map the names of the parameters in the JSON channel description to 85# the parameter names we use in the Channel class. 86CHANNEL_PARAMS = { 87 "address": "address", 88 "id": "id", 89 "expiration": "expiration", 90 "params": "params", 91 "resourceId": "resource_id", 92 "resourceUri": "resource_uri", 93 "type": "type", 94 "token": "token", 95} 96 97X_GOOG_CHANNEL_ID = "X-GOOG-CHANNEL-ID" 98X_GOOG_MESSAGE_NUMBER = "X-GOOG-MESSAGE-NUMBER" 99X_GOOG_RESOURCE_STATE = "X-GOOG-RESOURCE-STATE" 100X_GOOG_RESOURCE_URI = "X-GOOG-RESOURCE-URI" 101X_GOOG_RESOURCE_ID = "X-GOOG-RESOURCE-ID" 102 103 104def _upper_header_keys(headers): 105 new_headers = {} 106 for k, v in headers.items(): 107 new_headers[k.upper()] = v 108 return new_headers 109 110 111class Notification(object): 112 """A Notification from a Channel. 113 114 Notifications are not usually constructed directly, but are returned 115 from functions like notification_from_headers(). 116 117 Attributes: 118 message_number: int, The unique id number of this notification. 119 state: str, The state of the resource being monitored. 120 uri: str, The address of the resource being monitored. 121 resource_id: str, The unique identifier of the version of the resource at 122 this event. 123 """ 124 125 @util.positional(5) 126 def __init__(self, message_number, state, resource_uri, resource_id): 127 """Notification constructor. 128 129 Args: 130 message_number: int, The unique id number of this notification. 131 state: str, The state of the resource being monitored. Can be one 132 of "exists", "not_exists", or "sync". 133 resource_uri: str, The address of the resource being monitored. 134 resource_id: str, The identifier of the watched resource. 135 """ 136 self.message_number = message_number 137 self.state = state 138 self.resource_uri = resource_uri 139 self.resource_id = resource_id 140 141 142class Channel(object): 143 """A Channel for notifications. 144 145 Usually not constructed directly, instead it is returned from helper 146 functions like new_webhook_channel(). 147 148 Attributes: 149 type: str, The type of delivery mechanism used by this channel. For 150 example, 'web_hook'. 151 id: str, A UUID for the channel. 152 token: str, An arbitrary string associated with the channel that 153 is delivered to the target address with each event delivered 154 over this channel. 155 address: str, The address of the receiving entity where events are 156 delivered. Specific to the channel type. 157 expiration: int, The time, in milliseconds from the epoch, when this 158 channel will expire. 159 params: dict, A dictionary of string to string, with additional parameters 160 controlling delivery channel behavior. 161 resource_id: str, An opaque id that identifies the resource that is 162 being watched. Stable across different API versions. 163 resource_uri: str, The canonicalized ID of the watched resource. 164 """ 165 166 @util.positional(5) 167 def __init__( 168 self, 169 type, 170 id, 171 token, 172 address, 173 expiration=None, 174 params=None, 175 resource_id="", 176 resource_uri="", 177 ): 178 """Create a new Channel. 179 180 In user code, this Channel constructor will not typically be called 181 manually since there are functions for creating channels for each specific 182 type with a more customized set of arguments to pass. 183 184 Args: 185 type: str, The type of delivery mechanism used by this channel. For 186 example, 'web_hook'. 187 id: str, A UUID for the channel. 188 token: str, An arbitrary string associated with the channel that 189 is delivered to the target address with each event delivered 190 over this channel. 191 address: str, The address of the receiving entity where events are 192 delivered. Specific to the channel type. 193 expiration: int, The time, in milliseconds from the epoch, when this 194 channel will expire. 195 params: dict, A dictionary of string to string, with additional parameters 196 controlling delivery channel behavior. 197 resource_id: str, An opaque id that identifies the resource that is 198 being watched. Stable across different API versions. 199 resource_uri: str, The canonicalized ID of the watched resource. 200 """ 201 self.type = type 202 self.id = id 203 self.token = token 204 self.address = address 205 self.expiration = expiration 206 self.params = params 207 self.resource_id = resource_id 208 self.resource_uri = resource_uri 209 210 def body(self): 211 """Build a body from the Channel. 212 213 Constructs a dictionary that's appropriate for passing into watch() 214 methods as the value of body argument. 215 216 Returns: 217 A dictionary representation of the channel. 218 """ 219 result = { 220 "id": self.id, 221 "token": self.token, 222 "type": self.type, 223 "address": self.address, 224 } 225 if self.params: 226 result["params"] = self.params 227 if self.resource_id: 228 result["resourceId"] = self.resource_id 229 if self.resource_uri: 230 result["resourceUri"] = self.resource_uri 231 if self.expiration: 232 result["expiration"] = self.expiration 233 234 return result 235 236 def update(self, resp): 237 """Update a channel with information from the response of watch(). 238 239 When a request is sent to watch() a resource, the response returned 240 from the watch() request is a dictionary with updated channel information, 241 such as the resource_id, which is needed when stopping a subscription. 242 243 Args: 244 resp: dict, The response from a watch() method. 245 """ 246 for json_name, param_name in CHANNEL_PARAMS.items(): 247 value = resp.get(json_name) 248 if value is not None: 249 setattr(self, param_name, value) 250 251 252def notification_from_headers(channel, headers): 253 """Parse a notification from the webhook request headers, validate 254 the notification, and return a Notification object. 255 256 Args: 257 channel: Channel, The channel that the notification is associated with. 258 headers: dict, A dictionary like object that contains the request headers 259 from the webhook HTTP request. 260 261 Returns: 262 A Notification object. 263 264 Raises: 265 errors.InvalidNotificationError if the notification is invalid. 266 ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int. 267 """ 268 headers = _upper_header_keys(headers) 269 channel_id = headers[X_GOOG_CHANNEL_ID] 270 if channel.id != channel_id: 271 raise errors.InvalidNotificationError( 272 "Channel id mismatch: %s != %s" % (channel.id, channel_id) 273 ) 274 else: 275 message_number = int(headers[X_GOOG_MESSAGE_NUMBER]) 276 state = headers[X_GOOG_RESOURCE_STATE] 277 resource_uri = headers[X_GOOG_RESOURCE_URI] 278 resource_id = headers[X_GOOG_RESOURCE_ID] 279 return Notification(message_number, state, resource_uri, resource_id) 280 281 282@util.positional(2) 283def new_webhook_channel(url, token=None, expiration=None, params=None): 284 """Create a new webhook Channel. 285 286 Args: 287 url: str, URL to post notifications to. 288 token: str, An arbitrary string associated with the channel that 289 is delivered to the target address with each notification delivered 290 over this channel. 291 expiration: datetime.datetime, A time in the future when the channel 292 should expire. Can also be None if the subscription should use the 293 default expiration. Note that different services may have different 294 limits on how long a subscription lasts. Check the response from the 295 watch() method to see the value the service has set for an expiration 296 time. 297 params: dict, Extra parameters to pass on channel creation. Currently 298 not used for webhook channels. 299 """ 300 expiration_ms = 0 301 if expiration: 302 delta = expiration - EPOCH 303 expiration_ms = ( 304 delta.microseconds / 1000 + (delta.seconds + delta.days * 24 * 3600) * 1000 305 ) 306 if expiration_ms < 0: 307 expiration_ms = 0 308 309 return Channel( 310 "web_hook", 311 str(uuid.uuid4()), 312 token, 313 url, 314 expiration=expiration_ms, 315 params=params, 316 ) 317