xref: /aosp_15_r20/external/autotest/utils/frozen_chromite/lib/auth.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# -*- coding: utf-8 -*-
2# Copyright 2015 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""Functions for authenticating httplib2 requests with OAuth2 tokens."""
6
7from __future__ import print_function
8
9import os
10
11import httplib2
12
13from autotest_lib.utils.frozen_chromite.lib import cipd
14from autotest_lib.utils.frozen_chromite.lib import cros_build_lib
15from autotest_lib.utils.frozen_chromite.lib import cros_logging as logging
16from autotest_lib.utils.frozen_chromite.lib import retry_util
17from autotest_lib.utils.frozen_chromite.lib import path_util
18
19
20REFRESH_STATUS_CODES = [401]
21
22# Retry times on get_access_token
23RETRY_GET_ACCESS_TOKEN = 3
24
25
26class AccessTokenError(Exception):
27  """Error accessing the token."""
28
29
30def _GetCipdBinary(pkg_name, bin_name, instance_id):
31  """Returns a local path to the given binary fetched from cipd."""
32  cache_dir = os.path.join(path_util.GetCacheDir(), 'cipd', 'packages')
33  path = cipd.InstallPackage(
34      cipd.GetCIPDFromCache(),
35      pkg_name,
36      instance_id,
37      destination=cache_dir)
38
39  return os.path.join(path, bin_name)
40
41
42# crbug:871831 default to last sha1 version.
43def GetLuciAuth(
44    instance_id='git_revision:fd059ace316e4dbcaa5afdcec9ed4a855c4f3c65'):
45  """Returns a path to the luci-auth binary.
46
47  This will download and install the luci-auth package if it is not already
48  deployed.
49
50  Args:
51    instance_id: The instance-id of the package to install.
52
53  Returns:
54    the path to the luci-auth binary.
55  """
56  return _GetCipdBinary(
57      'infra/tools/luci-auth/linux-amd64',
58      'luci-auth',
59      instance_id)
60
61
62# crbug:871831 default to last sha1 version.
63def GetLuciGitCreds(
64    instance_id='git_revision:fd059ace316e4dbcaa5afdcec9ed4a855c4f3c65'):
65  """Returns a path to the git-credential-luci binary.
66
67  This will download and install the git-credential-luci package if it is not
68  already deployed.
69
70  Args:
71    instance_id: The instance-id of the package to install.
72
73  Returns:
74    the path to the git-credential-luci binary.
75  """
76  return _GetCipdBinary(
77      'infra/tools/luci/git-credential-luci/linux-amd64',
78      'git-credential-luci',
79      instance_id)
80
81
82def Login(service_account_json=None):
83  """Logs a user into chrome-infra-auth using luci-auth.
84
85  Runs 'luci-auth login' to get a OAuth2 refresh token.
86
87  Args:
88    service_account_json: A optional path to a service account.
89
90  Raises:
91    AccessTokenError if login command failed.
92  """
93  logging.info('Logging into chrome-infra-auth with service_account %s',
94               service_account_json)
95
96  cmd = [GetLuciAuth(), 'login']
97  if service_account_json and os.path.isfile(service_account_json):
98    cmd += ['-service-account-json=%s' % service_account_json]
99
100  result = cros_build_lib.run(
101      cmd,
102      print_cmd=True,
103      check=False)
104
105  if result.returncode:
106    raise AccessTokenError('Failed at  logging in to chrome-infra-auth: %s,'
107                           ' may retry.')
108
109
110def Token(service_account_json=None):
111  """Get the token using luci-auth.
112
113  Runs 'luci-auth token' to get the OAuth2 token.
114
115  Args:
116    service_account_json: A optional path to a service account.
117
118  Returns:
119    The token string if the command succeeded;
120
121  Raises:
122    AccessTokenError if token command failed.
123  """
124  cmd = [GetLuciAuth(), 'token']
125  if service_account_json and os.path.isfile(service_account_json):
126    cmd += ['-service-account-json=%s' % service_account_json]
127
128  result = cros_build_lib.run(
129      cmd,
130      print_cmd=False,
131      capture_output=True,
132      check=False,
133      encoding='utf-8')
134
135  if result.returncode:
136    raise AccessTokenError('Failed at getting the access token, may retry.')
137
138  return result.output.strip()
139
140
141def _TokenAndLoginIfNeed(service_account_json=None, force_token_renew=False):
142  """Run Token and Login opertions.
143
144  If force_token_renew is on, run Login operation first to force token renew,
145  then run Token operation to return token string.
146  If force_token_renew is off, run Token operation first. If no token found,
147  run Login operation to refresh the token. Throw an AccessTokenError after
148  running the Login operation, so that GetAccessToken can retry on
149  _TokenAndLoginIfNeed.
150
151  Args:
152    service_account_json: A optional path to a service account.
153    force_token_renew: Boolean indicating whether to force login to renew token
154      before returning a token. Default to False.
155
156  Returns:
157    The token string if the command succeeded; else, None.
158
159  Raises:
160    AccessTokenError if the Token operation failed.
161  """
162  if force_token_renew:
163    Login(service_account_json=service_account_json)
164    return Token(service_account_json=service_account_json)
165  else:
166    try:
167      return Token(service_account_json=service_account_json)
168    except AccessTokenError as e:
169      Login(service_account_json=service_account_json)
170      # Raise the error and let the caller decide wether to retry
171      raise e
172
173
174def GetAccessToken(**kwargs):
175  """Returns an OAuth2 access token using luci-auth.
176
177  Retry the _TokenAndLoginIfNeed function when the error thrown is an
178  AccessTokenError.
179
180  Args:
181    kwargs: A list of keyword arguments to pass to _TokenAndLoginIfNeed.
182
183  Returns:
184    The access token string or None if failed to get access token.
185  """
186  service_account_json = kwargs.get('service_account_json')
187  force_token_renew = kwargs.get('force_token_renew', False)
188  retry = lambda e: isinstance(e, AccessTokenError)
189  try:
190    result = retry_util.GenericRetry(
191        retry, RETRY_GET_ACCESS_TOKEN,
192        _TokenAndLoginIfNeed,
193        service_account_json=service_account_json,
194        force_token_renew=force_token_renew,
195        sleep=3)
196    return result
197  except AccessTokenError as e:
198    logging.error('Failed at getting the access token: %s ', e)
199    # Do not raise the AccessTokenError here.
200    # Let the response returned by the request handler
201    # tell the status and errors.
202    return
203
204
205def GitCreds(service_account_json=None):
206  """Get the git credential using git-credential-luci.
207
208  Args:
209    service_account_json: A optional path to a service account.
210
211  Returns:
212    The git credential if the command succeeded;
213
214  Raises:
215    AccessTokenError if token command failed.
216  """
217  cmd = [GetLuciGitCreds(), 'get']
218  if service_account_json and os.path.isfile(service_account_json):
219    cmd += ['-service-account-json=%s' % service_account_json]
220
221  result = cros_build_lib.run(
222      cmd,
223      print_cmd=False,
224      capture_output=True,
225      check=False,
226      encoding='utf-8')
227
228  if result.returncode:
229    raise AccessTokenError('Unable to fetch git credential.')
230
231  for line in result.stdout.splitlines():
232    if line.startswith('password='):
233      return line.split('password=')[1].strip()
234
235  raise AccessTokenError('Unable to fetch git credential.')
236
237
238class AuthorizedHttp(object):
239  """Authorized http instance"""
240
241  def __init__(self, get_access_token, http, **kwargs):
242    self.get_access_token = get_access_token
243    self.http = http if http is not None else httplib2.Http()
244    self.token = self.get_access_token(**kwargs)
245    self.kwargs = kwargs
246
247  # Adapted from oauth2client.OAuth2Credentials.authorize.
248  # We can't use oauthclient2 because the import will fail on slaves due to
249  # missing PyOpenSSL (crbug.com/498467).
250  def request(self, *args, **kwargs):
251    headers = kwargs.get('headers', {}).copy()
252    headers['Authorization'] = 'Bearer %s' % self.token
253    kwargs['headers'] = headers
254
255    resp, content = self.http.request(*args, **kwargs)
256    if resp.status in REFRESH_STATUS_CODES:
257      logging.info('OAuth token TTL expired, auto-refreshing')
258
259      # Token expired, force token renew
260      kwargs_copy = dict(self.kwargs, force_token_renew=True)
261      self.token = self.get_access_token(**kwargs_copy)
262
263      # TODO(phobbs): delete the "access_token" key from the token file used.
264      headers['Authorization'] = 'Bearer %s' % self.token
265      resp, content = self.http.request(*args, **kwargs)
266
267    return resp, content
268