|
| 1 | +import requests |
| 2 | +import json |
| 3 | + |
| 4 | +__version__ = "0.2.0" |
| 5 | + |
| 6 | +API_ENDPOINT = "https://api.warrant.dev" |
| 7 | +API_VERSION = "/v1" |
| 8 | + |
| 9 | +class WarrantException(Exception): |
| 10 | + def __init__(self, msg, status_code=-1): |
| 11 | + if status_code == -1: |
| 12 | + message = 'Warrant error: ' + msg |
| 13 | + else: |
| 14 | + message = f"Warrant error: {status_code} " + msg |
| 15 | + super().__init__(message) |
| 16 | + |
| 17 | +class User(object): |
| 18 | + def __init__(self, object_type, object_id, relation): |
| 19 | + self.objectType = object_type |
| 20 | + self.objectId = object_id |
| 21 | + self.relation = relation |
| 22 | + |
| 23 | +class Warrant(object): |
| 24 | + def __init__(self, api_key): |
| 25 | + self._apiKey = api_key |
| 26 | + |
| 27 | + def _make_post_request(self, uri, json={}): |
| 28 | + headers = { "Authorization": "ApiKey " + self._apiKey } |
| 29 | + resp = requests.post(url = API_ENDPOINT+API_VERSION+uri, headers = headers, json = json) |
| 30 | + if resp.status_code == 200: |
| 31 | + return resp.json() |
| 32 | + else: |
| 33 | + raise WarrantException(msg=resp.text, status_code=resp.status_code) |
| 34 | + |
| 35 | + def create_user(self, user_id=""): |
| 36 | + if user_id == "": |
| 37 | + payload = {} |
| 38 | + else: |
| 39 | + payload = { "userId": user_id } |
| 40 | + json = self._make_post_request(uri="/users", json=payload) |
| 41 | + return json['userId'] |
| 42 | + |
| 43 | + def create_session(self, user_id): |
| 44 | + if user_id == "": |
| 45 | + raise WarrantException(msg="Invalid userId provided") |
| 46 | + json = self._make_post_request(uri="/users/"+user_id+"/sessions") |
| 47 | + return json['token'] |
| 48 | + |
| 49 | + def create_warrant(self, object_type, object_id, relation, user): |
| 50 | + if object_type == "" or object_id == "" or relation == "": |
| 51 | + raise WarrantException(msg="Invalid object_type, object_id and/or relation") |
| 52 | + payload = { |
| 53 | + "objectType": object_type, |
| 54 | + "objectId": object_id, |
| 55 | + "relation": relation |
| 56 | + } |
| 57 | + if isinstance(user, str): |
| 58 | + payload["user"] = { "userId": user } |
| 59 | + elif isinstance(user, User): |
| 60 | + payload["user"] = json.dumps(user.__dict__) |
| 61 | + else: |
| 62 | + raise WarrantException(msg="Invalid type for \'user\'. Must be of type User or str") |
| 63 | + resp = self._make_post_request(uri="/warrants", json=payload) |
| 64 | + return resp['id'] |
| 65 | + |
| 66 | + def is_authorized(self, object_type, object_id, relation, user_to_check): |
| 67 | + if object_type == "" or object_id == "" or relation == "": |
| 68 | + raise WarrantException(msg="Invalid object_type, object_id and/or relation") |
| 69 | + payload = { |
| 70 | + "objectType": object_type, |
| 71 | + "objectId": object_id, |
| 72 | + "relation": relation |
| 73 | + } |
| 74 | + if isinstance(user_to_check, str): |
| 75 | + payload["user"] = { "userId": user_to_check } |
| 76 | + elif isinstance(user_to_check, User): |
| 77 | + payload["user"] = json.dumps(user_to_check.__dict__) |
| 78 | + else: |
| 79 | + raise WarrantException(msg="Invalid type for \'user_to_check\'. Must be of type User or str") |
| 80 | + headers = { "Authorization": "ApiKey " + self._apiKey } |
| 81 | + resp = requests.post(url = API_ENDPOINT+API_VERSION+"/authorize", headers = headers, json=payload) |
| 82 | + if resp.status_code == 200: |
| 83 | + return True |
| 84 | + elif resp.status_code == 401: |
| 85 | + return False |
| 86 | + else: |
| 87 | + raise WarrantException(msg=resp.text, status_code=resp.status_code) |
0 commit comments