Merge pull request #1457 from guardicore/1126/ut_for_pba_file_upload

UT: Add unit tests for pba_upload
This commit is contained in:
VakarisZ 2021-09-10 17:03:54 +03:00 committed by GitHub
commit 45429f6b29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 139 additions and 0 deletions

View File

@ -1,4 +1,5 @@
import copy import copy
from http import HTTPStatus
import flask_restful import flask_restful
from flask import Response, request, send_from_directory from flask import Response, request, send_from_directory
@ -27,6 +28,9 @@ class FileUpload(flask_restful.Resource):
:param file_type: Type indicates which file to send, linux or windows :param file_type: Type indicates which file to send, linux or windows
:return: Returns file contents :return: Returns file contents
""" """
if self.is_pba_file_type_supported(file_type):
return Response(status=HTTPStatus.UNPROCESSABLE_ENTITY, mimetype="text/plain")
# Verify that file_name is indeed a file from config # Verify that file_name is indeed a file from config
if file_type == LINUX_PBA_TYPE: if file_type == LINUX_PBA_TYPE:
filename = ConfigService.get_config_value(copy.deepcopy(PBA_LINUX_FILENAME_PATH)) filename = ConfigService.get_config_value(copy.deepcopy(PBA_LINUX_FILENAME_PATH))
@ -41,6 +45,9 @@ class FileUpload(flask_restful.Resource):
:param file_type: Type indicates which file was received, linux or windows :param file_type: Type indicates which file was received, linux or windows
:return: Returns flask response object with uploaded file's filename :return: Returns flask response object with uploaded file's filename
""" """
if self.is_pba_file_type_supported(file_type):
return Response(status=HTTPStatus.UNPROCESSABLE_ENTITY, mimetype="text/plain")
filename = FileUpload.upload_pba_file( filename = FileUpload.upload_pba_file(
request.files["filepond"], (file_type == LINUX_PBA_TYPE) request.files["filepond"], (file_type == LINUX_PBA_TYPE)
) )
@ -74,6 +81,9 @@ class FileUpload(flask_restful.Resource):
:param file_type: Type indicates which file was deleted, linux of windows :param file_type: Type indicates which file was deleted, linux of windows
:return: Empty response :return: Empty response
""" """
if self.is_pba_file_type_supported(file_type):
return Response(status=HTTPStatus.UNPROCESSABLE_ENTITY, mimetype="text/plain")
filename_path = ( filename_path = (
PBA_LINUX_FILENAME_PATH if file_type == "PBAlinux" else PBA_WINDOWS_FILENAME_PATH PBA_LINUX_FILENAME_PATH if file_type == "PBAlinux" else PBA_WINDOWS_FILENAME_PATH
) )
@ -83,3 +93,7 @@ class FileUpload(flask_restful.Resource):
ConfigService.set_config_value(filename_path, "") ConfigService.set_config_value(filename_path, "")
return {} return {}
@staticmethod
def is_pba_file_type_supported(file_type: str) -> bool:
return file_type not in {LINUX_PBA_TYPE, WINDOWS_PBA_TYPE}

View File

@ -0,0 +1,125 @@
import pytest
from tests.utils import raise_
from monkey_island.cc.resources.pba_file_upload import LINUX_PBA_TYPE, WINDOWS_PBA_TYPE
from monkey_island.cc.services.post_breach_files import PostBreachFilesService
TEST_FILE = b"""-----------------------------1
Content-Disposition: form-data; name="filepond"
{}
-----------------------------1
Content-Disposition: form-data; name="filepond"; filename="test.py"
Content-Type: text/x-python
m0nk3y
-----------------------------1--"""
@pytest.fixture(autouse=True)
def custom_pba_directory(tmpdir):
PostBreachFilesService.initialize(tmpdir)
@pytest.fixture
def mock_set_config_value(monkeypatch):
monkeypatch.setattr(
"monkey_island.cc.services.config.ConfigService.set_config_value", lambda _, __: None
)
@pytest.fixture
def mock_get_config_value(monkeypatch):
monkeypatch.setattr(
"monkey_island.cc.services.config.ConfigService.get_config_value", lambda _: "test.py"
)
@pytest.mark.parametrize("pba_os", [LINUX_PBA_TYPE, WINDOWS_PBA_TYPE])
def test_pba_file_upload_post(flask_client, pba_os, monkeypatch, mock_set_config_value):
resp = flask_client.post(
f"/api/fileUpload/{pba_os}",
data=TEST_FILE,
content_type="multipart/form-data; " "boundary=---------------------------" "1",
follow_redirects=True,
)
assert resp.status_code == 200
def test_pba_file_upload_post__invalid(flask_client, monkeypatch, mock_set_config_value):
resp = flask_client.post(
"/api/fileUpload/bogus",
data=TEST_FILE,
content_type="multipart/form-data; " "boundary=---------------------------" "1",
follow_redirects=True,
)
assert resp.status_code == 422
@pytest.mark.parametrize("pba_os", [LINUX_PBA_TYPE, WINDOWS_PBA_TYPE])
def test_pba_file_upload_post__internal_server_error(
flask_client, pba_os, monkeypatch, mock_set_config_value
):
monkeypatch.setattr(
"monkey_island.cc.resources.pba_file_upload.FileUpload.upload_pba_file",
lambda x, y: raise_(Exception()),
)
resp = flask_client.post(
f"/api/fileUpload/{pba_os}",
data=TEST_FILE,
content_type="multipart/form-data; boundary=---------------------------1",
follow_redirects=True,
)
assert resp.status_code == 500
@pytest.mark.parametrize("pba_os", [LINUX_PBA_TYPE, WINDOWS_PBA_TYPE])
def test_pba_file_upload_get__file_not_found(
flask_client, pba_os, monkeypatch, mock_get_config_value
):
resp = flask_client.get(f"/api/fileUpload/{pba_os}?load=bogus_mogus.py")
assert resp.status_code == 404
@pytest.mark.parametrize("pba_os", [LINUX_PBA_TYPE, WINDOWS_PBA_TYPE])
def test_pba_file_upload_endpoint(
flask_client, pba_os, monkeypatch, mock_get_config_value, mock_set_config_value
):
resp_post = flask_client.post(
f"/api/fileUpload/{pba_os}",
data=TEST_FILE,
content_type="multipart/form-data; " "boundary=---------------------------" "1",
follow_redirects=True,
)
resp_get = flask_client.get(f"/api/fileUpload/{pba_os}?load=test.py")
resp_delete = flask_client.delete(
f"/api/fileUpload/{pba_os}", data="test.py", content_type="text/plain;"
)
resp_get_del = flask_client.get(f"/api/fileUpload/{pba_os}?load=test.py")
assert resp_post.status_code == 200
assert resp_get.status_code == 200
assert resp_get.data.decode() == "m0nk3y"
assert resp_delete.status_code == 200
assert resp_get_del.status_code == 404
def test_pba_file_upload_endpoint__invalid(
flask_client, monkeypatch, mock_set_config_value, mock_get_config_value
):
resp_post = flask_client.post(
"/api/fileUpload/bogus",
data=TEST_FILE,
content_type="multipart/form-data; " "boundary=---------------------------" "1",
follow_redirects=True,
)
resp_get = flask_client.get("/api/fileUpload/bogus?load=test.py")
resp_delete = flask_client.delete(
"/api/fileUpload/bogus", data="test.py", content_type="text/plain;"
)
assert resp_post.status_code == 422
assert resp_get.status_code == 422
assert resp_delete.status_code == 422