monkey/envs/monkey_zoo/blackbox/island_client/monkey_island_requests.py

123 lines
4.3 KiB
Python

import functools
import logging
from datetime import timedelta
from typing import Dict
import requests
from envs.monkey_zoo.blackbox.island_client.supported_request_method import SupportedRequestMethod
# SHA3-512 of '1234567890!@#$%^&*()_nothing_up_my_sleeve_1234567890!@#$%^&*()'
NO_AUTH_CREDS = "1234567890!@#$%^&*()_nothing_up_my_sleeve_1234567890!@#$%^&*()"
LOGGER = logging.getLogger(__name__)
class AuthenticationFailedError(Exception):
pass
# noinspection PyArgumentList
class MonkeyIslandRequests(object):
def __init__(self, server_address):
self.addr = "https://{IP}/".format(IP=server_address)
self.token = self.try_get_jwt_from_server()
self.supported_request_methods = {
SupportedRequestMethod.GET: self.get,
SupportedRequestMethod.POST: self.post,
SupportedRequestMethod.PATCH: self.patch,
SupportedRequestMethod.DELETE: self.delete,
}
def get_request_time(self, url, method: SupportedRequestMethod, data=None):
response = self.send_request_by_method(url, method, data)
if response.ok:
LOGGER.debug(f"Got ok for {url} content peek:\n{response.content[:120].strip()}")
return response.elapsed
else:
LOGGER.error(f"Trying to get {url} but got unexpected {str(response)}")
# instead of raising for status, mark failed responses as maxtime
return timedelta.max
def send_request_by_method(self, url, method=SupportedRequestMethod.GET, data=None):
if data:
return self.supported_request_methods[method](url, data)
else:
return self.supported_request_methods[method](url)
def try_get_jwt_from_server(self):
try:
return self.get_jwt_from_server()
except AuthenticationFailedError:
self.try_set_island_to_no_password()
return self.get_jwt_from_server()
except requests.ConnectionError as err:
LOGGER.error(
"Unable to connect to island, aborting! Error information: {}. Server: {}".format(
err, self.addr
)
)
assert False
def get_jwt_from_server(self):
resp = requests.post( # noqa: DUO123
self.addr + "api/auth",
json={"username": NO_AUTH_CREDS, "password": NO_AUTH_CREDS},
verify=False,
)
if resp.status_code == 401:
raise AuthenticationFailedError
return resp.json()["access_token"]
def try_set_island_to_no_password(self):
requests.patch( # noqa: DUO123
self.addr + "api/environment", json={"server_config": "standard"}, verify=False
)
class _Decorators:
@classmethod
def refresh_jwt_token(cls, request_function):
@functools.wraps(request_function)
def request_function_wrapper(self, *args, **kwargs):
self.token = self.try_get_jwt_from_server()
# noinspection PyArgumentList
return request_function(self, *args, **kwargs)
return request_function_wrapper
@_Decorators.refresh_jwt_token
def get(self, url, data=None):
return requests.get( # noqa: DUO123
self.addr + url,
headers=self.get_jwt_header(),
params=data,
verify=False,
)
@_Decorators.refresh_jwt_token
def post(self, url, data):
return requests.post( # noqa: DUO123
self.addr + url, data=data, headers=self.get_jwt_header(), verify=False
)
@_Decorators.refresh_jwt_token
def post_json(self, url, data: Dict):
return requests.post( # noqa: DUO123
self.addr + url, json=data, headers=self.get_jwt_header(), verify=False
)
@_Decorators.refresh_jwt_token
def patch(self, url, data: Dict):
return requests.patch( # noqa: DUO123
self.addr + url, data=data, headers=self.get_jwt_header(), verify=False
)
@_Decorators.refresh_jwt_token
def delete(self, url):
return requests.delete( # noqa: DUO123
self.addr + url, headers=self.get_jwt_header(), verify=False
)
@_Decorators.refresh_jwt_token
def get_jwt_header(self):
return {"Authorization": "Bearer " + self.token}