Agent: Implement ControlChannel should_agent_stop

This commit is contained in:
Ilija Lazoroski 2021-11-22 14:27:39 +01:00
parent 7766e27f16
commit 0d8070080a
2 changed files with 59 additions and 0 deletions

View File

@ -0,0 +1,32 @@
import json
import logging
from abc import ABC
import requests
from common.common_consts.timeouts import SHORT_REQUEST_TIMEOUT
from infection_monkey.config import GUID, WormConfiguration
from monkey.infection_monkey.i_control_channel import IControlChannel
requests.packages.urllib3.disable_warnings()
logger = logging.getLogger(__name__)
class ControlChannel(IControlChannel, ABC):
def should_agent_stop(self) -> bool:
server = WormConfiguration.current_server
if not server:
return
try:
response = requests.get( # noqa: DUO123
f"{server}/api/monkey_control/{GUID}",
verify=False,
timeout=SHORT_REQUEST_TIMEOUT,
)
response = json.loads(response.content.decode())
return response["stop_agent"]
except Exception as e:
logger.error(f"Error happened while trying to connect to server. {e}")

View File

@ -0,0 +1,27 @@
import abc
class IControlChannel(metaclass=abc.ABCMeta):
@abc.abstractmethod
def should_agent_stop(self) -> bool:
"""
Checks if the agent should stop
return: True if the agent should stop, False otherwise
rtype: bool
"""
@abc.abstractmethod
def get_config(self) -> dict:
"""
:return: A dictionary containing Agent Configuration
:rtype: dict
"""
pass
@abc.abstractmethod
def get_credentials_for_propagation(self) -> dict:
"""
:return: A dictionary containing propagation credentials data
:rtype: dict
"""
pass