from flask import Flask, render_template, request, jsonify import hashlib import time import random import json import logging import requests from requests_toolbelt.multipart.encoder import MultipartEncoder
app = Flask(__name__) app.logger.setLevel(logging.DEBUG)
class SignGenerator: def __init__(self): self.chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" def random_string(self, length=16): """生成随机字符串""" return ''.join(random.choice(self.chars) for _ in range(length)) def sign(self, params=None): """生成签名字符串""" if params is None: params = {} n = json.loads(json.dumps(params)) t = {} sorted_keys = sorted(n.keys()) for key in sorted_keys: if key != 'sign': t[key] = n[key] r = [] for key in sorted(t.keys()): if key in t and key != 'sign': r.append(f"{key}={t[key]}") return ''.join(r) def generate_signature(self, param_str): """生成MD5签名""" return hashlib.md5(param_str.encode('utf-8')).hexdigest() def params(self, original_params=None, custom_nonce=None, custom_timestamp=None): """生成完整参数""" if original_params is None: original_params = {} n = json.loads(json.dumps(original_params)) if custom_nonce is not None and custom_nonce != "": app.logger.debug(f"使用自定义nonce: {custom_nonce}") n['nonce'] = custom_nonce else: new_nonce = self.random_string() app.logger.debug(f"生成新nonce: {new_nonce}") n['nonce'] = new_nonce if custom_timestamp is not None and custom_timestamp != "": try: timestamp = int(custom_timestamp) app.logger.debug(f"使用自定义timestamp: {timestamp}") n['timestamp'] = timestamp except (ValueError, TypeError): app.logger.warning(f"无效的时间戳值: {custom_timestamp}, 使用当前时间") n['timestamp'] = int(time.time()) else: current_time = int(time.time()) app.logger.debug(f"使用当前时间戳: {current_time}") n['timestamp'] = current_time sign_str = self.sign(n) n['sign'] = self.generate_signature(sign_str) return n, sign_str
class RequestSender: def __init__(self): self.default_headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 NetType/WIFI MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63090c37) XWEB/8555 Flue', 'Connection': 'keep-alive' } def create_multipart_data(self, form_data, boundary=None): """创建multipart/form-data数据""" if boundary is None: boundary = f"----WebKitFormBoundary{self.random_string(16)}" multipart_data = {} for key, value in form_data.items(): multipart_data[key] = (None, str(value)) return MultipartEncoder(fields=multipart_data, boundary=boundary) def random_string(self, length=16): """生成随机字符串""" chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" return ''.join(random.choice(chars) for _ in range(length)) def send_request(self, url, form_data, custom_headers=None, boundary=None): """发送multipart/form-data请求""" try: multipart_encoder = self.create_multipart_data(form_data, boundary) request_headers = self.default_headers.copy() if custom_headers: request_headers.update(custom_headers) request_headers['Content-Type'] = multipart_encoder.content_type request_headers['Content-Length'] = str(multipart_encoder.len) app.logger.debug(f"发送请求到: {url}") app.logger.debug(f"请求头: {request_headers}") app.logger.debug(f"表单数据: {form_data}") response = requests.post( url, data=multipart_encoder, headers=request_headers, timeout=30 ) return { 'success': True, 'status_code': response.status_code, 'headers': dict(response.headers), 'content': response.text, 'elapsed': response.elapsed.total_seconds() } except requests.exceptions.RequestException as e: app.logger.error(f"请求发送失败: {str(e)}") return { 'success': False, 'error': str(e) } except Exception as e: app.logger.error(f"发送请求时发生未知错误: {str(e)}") return { 'success': False, 'error': f'未知错误: {str(e)}' }
sign_gen = SignGenerator() request_sender = RequestSender()
@app.route('/') def index(): """主页 - 显示参数输入表单""" return render_template('index.html')
@app.route('/generate', methods=['POST']) def generate_signature(): """处理签名生成请求""" try: if request.is_json: data = request.get_json() app.logger.debug(f"收到JSON请求: {data}") params = data.get('params', {}) custom_nonce = data.get('custom_nonce') custom_timestamp = data.get('custom_timestamp') else: app.logger.debug(f"收到表单请求: {request.form}") params = {} for key in request.form.keys(): if key not in ['action', 'custom_nonce', 'custom_timestamp'] and request.form[key]: if key.startswith('param_key_') and request.form[key]: idx = key.replace('param_key_', '') value_key = f'param_value_{idx}' if value_key in request.form and request.form[value_key]: param_name = request.form[key] param_value = request.form[value_key] params[param_name] = param_value custom_nonce = request.form.get('custom_nonce') custom_timestamp = request.form.get('custom_timestamp') app.logger.debug(f"处理参数: params={params}, custom_nonce={custom_nonce}, custom_timestamp={custom_timestamp}") result_params, sign_str = sign_gen.params(params, custom_nonce, custom_timestamp) used_custom_nonce = custom_nonce is not None and custom_nonce != "" used_custom_timestamp = custom_timestamp is not None and custom_timestamp != "" result = { 'success': True, 'params': result_params, 'sign_details': { 'original_string': sign_str, 'generated_sign': result_params['sign'], 'timestamp': result_params['timestamp'], 'formatted_time': time .strftime('%Y-%m-%d %H:%M:%S', time.localtime(result_params['timestamp'])), 'nonce': result_params['nonce'], 'used_custom_nonce': used_custom_nonce, 'used_custom_timestamp': used_custom_timestamp } } app.logger.debug(f"返回结果: {result}") return jsonify(result) except Exception as e: app.logger.error(f"生成签名时出错: {str(e)}") return jsonify({ 'success': False, 'error': str(e) }), 400
@app.route('/send_request', methods=['POST']) def send_api_request(): """发送API请求到目标服务器""" try: data = request.get_json() app.logger.debug(f"收到发送请求: {data}") form_params = data.get('form_params', {}) custom_nonce = data.get('custom_nonce') custom_timestamp = data.get('custom_timestamp') target_url = data.get('target_url', '') custom_headers = data.get('custom_headers', {}) boundary = data.get('boundary', '----WebKitFormBoundaryxCjvbAAnY55N0prq') sign_params, sign_str = sign_gen.params(form_params, custom_nonce, custom_timestamp) final_form_data = form_params.copy() final_form_data['nonce'] = sign_params['nonce'] final_form_data['timestamp'] = sign_params['timestamp'] headers = custom_headers.copy() headers['Sign'] = sign_params['sign'] app.logger.debug(f"最终表单数据: {final_form_data}") app.logger.debug(f"请求头: {headers}") app.logger.debug(f"目标URL: {target_url}") send_result = request_sender.send_request(target_url, final_form_data, headers, boundary) result = { 'success': send_result['success'], 'request_info': { 'url': target_url, 'form_data': final_form_data, 'headers': headers, 'signature': sign_params['sign'], 'nonce': sign_params['nonce'], 'timestamp': sign_params['timestamp'] }, 'sign_details': { 'original_string': sign_str, 'generated_sign': sign_params['sign'] } } if send_result['success']: result['response'] = { 'status_code': send_result['status_code'], 'elapsed_time': send_result['elapsed'], 'content': send_result['content'], 'headers': send_result['headers'] } else: result['error'] = send_result['error'] return jsonify(result) except Exception as e: app.logger.error(f"发送API请求时出错: {str(e)}") return jsonify({ 'success': False, 'error': str(e) }), 400
@app.route('/batch_generate', methods=['POST']) def batch_generate(): """批量生成签名""" try: data = request.get_json() param_sets = data.get('param_sets', []) results = [] for param_set in param_sets: custom_nonce = param_set.pop('custom_nonce', None) if isinstance(param_set, dict) else None custom_timestamp = param_set.pop('custom_timestamp', None) if isinstance(param_set, dict) else None result_params, sign_str = sign_gen.params(param_set, custom_nonce, custom_timestamp) results.append({ 'input_params': param_set, 'result': result_params, 'sign_string': sign_str, 'used_custom_nonce': custom_nonce is not None and custom_nonce != "", 'used_custom_timestamp': custom_timestamp is not None and custom_timestamp != "" }) return jsonify({ 'success': True, 'results': results, 'total': len(results) }) except Exception as e: app.logger.error(f"批量生成时出错: {str(e)}") return jsonify({ 'success': False, 'error': str(e) }), 400
if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5000)
|