diff --git a/monkey/common/aws/aws_metadata.py b/monkey/common/aws/aws_metadata.py index cc2463ac2..2032fee81 100644 --- a/monkey/common/aws/aws_metadata.py +++ b/monkey/common/aws/aws_metadata.py @@ -1,6 +1,7 @@ import json import logging import re +from typing import Optional, Tuple import requests @@ -13,42 +14,54 @@ logger = logging.getLogger(__name__) AWS_TIMEOUT = 2 -def fetch_aws_instance_metadata(): +class UnknownAWSInstanceIDError(Exception): + """Raised if the AWS Instance ID could not be determined""" + + +def fetch_aws_instance_metadata() -> Tuple[Optional[str], Optional[str], Optional[str]]: instance_id = None region = None account_id = None try: - response = requests.get( - AWS_LATEST_METADATA_URI_PREFIX + "meta-data/instance-id", - timeout=AWS_TIMEOUT, - ) - if not response: - return (None, None, None) - - instance_id = response.text - - region = _parse_region( - requests.get( - AWS_LATEST_METADATA_URI_PREFIX + "meta-data/placement/availability-zone", - timeout=AWS_TIMEOUT, - ).text - ) - - account_id = _extract_account_id( - requests.get( - AWS_LATEST_METADATA_URI_PREFIX + "dynamic/instance-identity/document", - timeout=AWS_TIMEOUT, - ).text - ) - except (requests.RequestException, IOError, json.decoder.JSONDecodeError) as err: + instance_id = _fetch_aws_instance_id() + region = _fetch_aws_region() + account_id = _fetch_account_id() + except ( + requests.RequestException, + IOError, + json.decoder.JSONDecodeError, + UnknownAWSInstanceIDError, + ) as err: logger.debug(f"Failed init of AWSInstance while getting metadata: {err}") return (None, None, None) return (instance_id, region, account_id) -def _parse_region(region_url_response): +def _fetch_aws_instance_id() -> Optional[str]: + url = AWS_LATEST_METADATA_URI_PREFIX + "meta-data/instance-id" + response = requests.get( + url, + timeout=AWS_TIMEOUT, + ) + + if not response: + raise UnknownAWSInstanceIDError(f"Failed fetch the AWS Instance ID from {url}") + + return response.text + + +def _fetch_aws_region() -> Optional[str]: + return _parse_region( + requests.get( + AWS_LATEST_METADATA_URI_PREFIX + "meta-data/placement/availability-zone", + timeout=AWS_TIMEOUT, + ).text + ) + + +def _parse_region(region_url_response: str) -> Optional[str]: # For a list of regions, see: # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts # .RegionsAndAvailabilityZones.html @@ -61,14 +74,18 @@ def _parse_region(region_url_response): return None -def _extract_account_id(instance_identity_document_response): +def _fetch_account_id() -> str: """ - Extracts the account id from the dynamic/instance-identity/document metadata path. + Fetches and extracts the account id from the dynamic/instance-identity/document metadata path. Based on https://forums.aws.amazon.com/message.jspa?messageID=409028 which has a few more - solutions, - in case Amazon break this mechanism. + solutions, in case Amazon break this mechanism. :param instance_identity_document_response: json returned via the web page ../dynamic/instance-identity/document :return: The account id """ - return json.loads(instance_identity_document_response)[ACCOUNT_ID_KEY] + instance_identity_document = requests.get( + AWS_LATEST_METADATA_URI_PREFIX + "dynamic/instance-identity/document", + timeout=AWS_TIMEOUT, + ).text + + return json.loads(instance_identity_document)[ACCOUNT_ID_KEY]