UT: Add unit tests for pba_upload

This commit is contained in:
Ilija Lazoroski 2021-09-10 14:01:58 +02:00
parent dec2fc43c2
commit 2fd38061b2
2 changed files with 138 additions and 0 deletions

View File

@ -27,6 +27,9 @@ class FileUpload(flask_restful.Resource):
:param file_type: Type indicates which file to send, linux or windows :param file_type: Type indicates which file to send, linux or windows
:return: Returns file contents :return: Returns file contents
""" """
if self.check_file_type(file_type):
return Response(status=422, mimetype="text/plain")
# Verify that file_name is indeed a file from config # Verify that file_name is indeed a file from config
if file_type == LINUX_PBA_TYPE: if file_type == LINUX_PBA_TYPE:
filename = ConfigService.get_config_value(copy.deepcopy(PBA_LINUX_FILENAME_PATH)) filename = ConfigService.get_config_value(copy.deepcopy(PBA_LINUX_FILENAME_PATH))
@ -41,6 +44,9 @@ class FileUpload(flask_restful.Resource):
:param file_type: Type indicates which file was received, linux or windows :param file_type: Type indicates which file was received, linux or windows
:return: Returns flask response object with uploaded file's filename :return: Returns flask response object with uploaded file's filename
""" """
if self.check_file_type(file_type):
return Response(status=422, mimetype="text/plain")
filename = FileUpload.upload_pba_file( filename = FileUpload.upload_pba_file(
request.files["filepond"], (file_type == LINUX_PBA_TYPE) request.files["filepond"], (file_type == LINUX_PBA_TYPE)
) )
@ -74,6 +80,9 @@ class FileUpload(flask_restful.Resource):
:param file_type: Type indicates which file was deleted, linux of windows :param file_type: Type indicates which file was deleted, linux of windows
:return: Empty response :return: Empty response
""" """
if self.check_file_type(file_type):
return Response(status=422, mimetype="text/plain")
filename_path = ( filename_path = (
PBA_LINUX_FILENAME_PATH if file_type == "PBAlinux" else PBA_WINDOWS_FILENAME_PATH PBA_LINUX_FILENAME_PATH if file_type == "PBAlinux" else PBA_WINDOWS_FILENAME_PATH
) )
@ -83,3 +92,12 @@ class FileUpload(flask_restful.Resource):
ConfigService.set_config_value(filename_path, "") ConfigService.set_config_value(filename_path, "")
return {} return {}
@staticmethod
def check_file_type(file_type):
"""
Check if the file type is not supported
:param file_type: Type indicates which file was received, linux or windows
:return: Boolean
"""
return file_type not in {LINUX_PBA_TYPE, WINDOWS_PBA_TYPE}

View File

@ -0,0 +1,120 @@
import pytest
from tests.utils import raise_
from monkey_island.cc.resources.pba_file_upload import LINUX_PBA_TYPE, WINDOWS_PBA_TYPE
from monkey_island.cc.services.post_breach_files import PostBreachFilesService
TEST_FILE = b"""-----------------------------1
Content-Disposition: form-data; name="filepond"
{}
-----------------------------1
Content-Disposition: form-data; name="filepond"; filename="test.py"
Content-Type: text/x-python
m0nk3y
-----------------------------1--"""
@pytest.fixture(autouse=True)
def custom_pba_directory(tmpdir):
PostBreachFilesService.initialize(tmpdir)
@pytest.fixture
def fake_set_config_value(monkeypatch):
monkeypatch.setattr(
"monkey_island.cc.services.config.ConfigService.set_config_value", lambda _, __: None
)
@pytest.fixture
def fake_get_config_value(monkeypatch):
monkeypatch.setattr(
"monkey_island.cc.services.config.ConfigService.get_config_value", lambda _: "test.py"
)
@pytest.mark.parametrize("pba_os", [LINUX_PBA_TYPE, WINDOWS_PBA_TYPE])
def test_pba_file_upload_post(flask_client, pba_os, monkeypatch, fake_set_config_value):
resp = flask_client.post(
f"/api/fileUpload/{pba_os}",
data=TEST_FILE,
content_type="multipart/form-data; " "boundary=---------------------------" "1",
follow_redirects=True,
)
assert resp.status_code == 200
def test_pba_file_upload_post__invalid(flask_client, monkeypatch, fake_set_config_value):
resp = flask_client.post(
"/api/fileUpload/bogus",
data=TEST_FILE,
content_type="multipart/form-data; " "boundary=---------------------------" "1",
follow_redirects=True,
)
assert resp.status_code == 422
@pytest.mark.parametrize("pba_os", [LINUX_PBA_TYPE, WINDOWS_PBA_TYPE])
def test_pba_file_upload_post__internal_server_error(
flask_client, pba_os, monkeypatch, fake_set_config_value
):
monkeypatch.setattr(
"monkey_island.cc.resources.pba_file_upload.FileUpload.upload_pba_file",
lambda x, y: raise_(Exception()),
)
resp = flask_client.post(
f"/api/fileUpload/{pba_os}",
data=TEST_FILE,
content_type="multipart/form-data; boundary=---------------------------1",
follow_redirects=True,
)
assert resp.status_code == 500
@pytest.mark.parametrize("pba_os", [LINUX_PBA_TYPE, WINDOWS_PBA_TYPE])
def test_pba_file_upload_get__file_not_found(
flask_client, pba_os, monkeypatch, fake_get_config_value
):
resp = flask_client.get(f"/api/fileUpload/{pba_os}?load=bogus_mogus.py")
assert resp.status_code == 404
@pytest.mark.parametrize("pba_os", [LINUX_PBA_TYPE, WINDOWS_PBA_TYPE])
def test_pba_file_upload_endpoint(
flask_client, pba_os, monkeypatch, fake_get_config_value, fake_set_config_value
):
resp_post = flask_client.post(
f"/api/fileUpload/{pba_os}",
data=TEST_FILE,
content_type="multipart/form-data; " "boundary=---------------------------" "1",
follow_redirects=True,
)
resp_get = flask_client.get(f"/api/fileUpload/{pba_os}?load=test.py")
resp_delete = flask_client.delete(
f"/api/fileUpload/{pba_os}", data="test.py", content_type="text/plain;"
)
assert resp_post.status_code == 200
assert resp_get.status_code == 200
assert resp_get.data.decode() == "m0nk3y"
assert resp_delete.status_code == 200
def test_pba_file_upload_endpoint__invalid(
flask_client, monkeypatch, fake_set_config_value, fake_get_config_value
):
resp_post = flask_client.post(
"/api/fileUpload/bogus",
data=TEST_FILE,
content_type="multipart/form-data; " "boundary=---------------------------" "1",
follow_redirects=True,
)
resp_get = flask_client.get("/api/fileUpload/bogus?load=test.py")
resp_delete = flask_client.delete(
"/api/fileUpload/bogus", data="test.py", content_type="text/plain;"
)
assert resp_post.status_code == 422
assert resp_get.status_code == 422
assert resp_delete.status_code == 422