forked from p15670423/monkey
Island: Fetch version and download url from new lambda API
This commit is contained in:
parent
21f9b5ad53
commit
02e719f7b2
|
@ -1,14 +1,13 @@
|
|||
import logging
|
||||
from threading import Event, Thread
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import requests
|
||||
import semantic_version
|
||||
|
||||
from .deployment import Deployment
|
||||
|
||||
VERSION_SERVER_URL_PREF = "https://updates.infectionmonkey.com"
|
||||
VERSION_SERVER_CHECK_NEW_URL = VERSION_SERVER_URL_PREF + "?deployment=%s&monkey_version=%s"
|
||||
VERSION_SERVER_DOWNLOAD_URL = VERSION_SERVER_CHECK_NEW_URL + "&is_download=true"
|
||||
# TODO get redirects instead of using direct links to AWS
|
||||
LATEST_VERSION_URL = "https://njf01cuupf.execute-api.us-east-1.amazonaws.com/default?deployment={}"
|
||||
|
||||
LATEST_VERSION_TIMEOUT = 7
|
||||
|
||||
|
@ -55,29 +54,26 @@ class Version:
|
|||
return self._download_url
|
||||
|
||||
def _set_version_metadata(self):
|
||||
self._latest_version = self._get_latest_version()
|
||||
self._download_url = self._get_download_link()
|
||||
self._latest_version, self._download_url = self._get_version_info()
|
||||
self._initialization_complete.set()
|
||||
|
||||
def _get_latest_version(self) -> str:
|
||||
url = VERSION_SERVER_CHECK_NEW_URL % (self._deployment.value, self._version_number)
|
||||
def _get_version_info(self) -> Tuple[Optional[str], Optional[str]]:
|
||||
url = LATEST_VERSION_URL.format(self._deployment.value)
|
||||
|
||||
try:
|
||||
reply = requests.get(url, timeout=LATEST_VERSION_TIMEOUT)
|
||||
response = requests.get(url, timeout=LATEST_VERSION_TIMEOUT).json()
|
||||
except requests.exceptions.RequestException as err:
|
||||
logger.warning(f"Failed to connect to {VERSION_SERVER_URL_PREF}: {err}")
|
||||
return self._version_number
|
||||
logger.warning(f"Failed to connect to {url}, Error: {err}")
|
||||
return None, None
|
||||
|
||||
res = reply.json().get("newer_version", None)
|
||||
try:
|
||||
download_link = response["download_link"]
|
||||
latest_version = response["version"]
|
||||
except KeyError:
|
||||
logger.error(
|
||||
f"Failed to fetch version information from {url}, response: {response}."
|
||||
f"Most likely deployment {self._deployment.value} is not recognized."
|
||||
)
|
||||
return None, None
|
||||
|
||||
if res is False:
|
||||
return self._version_number
|
||||
|
||||
if not semantic_version.validate(res):
|
||||
logger.warning(f"Recieved invalid version {res} from {VERSION_SERVER_URL_PREF}")
|
||||
return self._version_number
|
||||
|
||||
return res.strip()
|
||||
|
||||
def _get_download_link(self):
|
||||
return VERSION_SERVER_DOWNLOAD_URL % (self._deployment.value, self._version_number)
|
||||
return latest_version, download_link
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from monkey_island.cc import Version
|
||||
from monkey_island.cc.deployment import Deployment
|
||||
|
||||
failed_response = MagicMock()
|
||||
failed_response.return_value.json.return_value = {"message": "Internal server error"}
|
||||
|
||||
successful_response = MagicMock()
|
||||
SUCCESS_VERSION = "1.1.1"
|
||||
SUCCESS_URL = "http://be_free.gov"
|
||||
successful_response.return_value.json.return_value = {
|
||||
"version": SUCCESS_VERSION,
|
||||
"download_link": SUCCESS_URL,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"request_mock",
|
||||
[
|
||||
failed_response,
|
||||
MagicMock(side_effect=requests.exceptions.RequestException("Timeout or something")),
|
||||
],
|
||||
)
|
||||
def test_version__request_failed(monkeypatch, request_mock):
|
||||
monkeypatch.setattr("requests.get", request_mock)
|
||||
version = Version(version_number="1.0.0", deployment=Deployment.DEVELOP)
|
||||
version._initialization_complete.wait()
|
||||
assert version.latest_version is None
|
||||
assert version.download_url is None
|
||||
|
||||
|
||||
def test_version__request_successful(monkeypatch):
|
||||
monkeypatch.setattr("requests.get", successful_response)
|
||||
version = Version(version_number="1.0.0", deployment=Deployment.DEVELOP)
|
||||
version._initialization_complete.wait()
|
||||
assert version.latest_version == SUCCESS_VERSION
|
||||
assert version.download_url == SUCCESS_URL
|
Loading…
Reference in New Issue