This commit is contained in:
Developer
2026-06-02 03:52:54 +08:00
parent 1cb9bc8649
commit 10df6b705c
38 changed files with 2285 additions and 167 deletions

View File

@@ -0,0 +1,170 @@
/// ============================================================
/// 闲言APP — 二维码登录WebSocket中继服务器
/// 创建时间: 2026-06-02
/// 更新时间: 2026-06-02
/// 作用: WebSocket中继服务器推送二维码状态变更给订阅客户端
/// 上次更新: 初始创建基于shelf_web_socket
///
/// 部署方式:
/// dart run qrcode_ws_relay.dart --port 9444
///
/// 架构:
/// 1. 客户端连接 ws://host:9444 并发送 {"type":"qrcode_subscribe","code":"xxx"}
/// 2. PHP后端在qrcodeConfirm/qrcodeCancel时调用 POST /notify
/// 3. 中继服务器将状态变更推送给订阅了对应code的客户端
/// ============================================================
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:shelf/shelf.dart' as shelf;
import 'package:shelf/shelf_io.dart' as shelf_io;
import 'package:shelf_router/shelf_router.dart';
import 'package:shelf_web_socket/shelf_web_socket.dart';
final _subscribers = <String, List<WebSocketChannel>>{};
final _codeToChannels = <String, List<WebSocketChannel>>{};
void main(List<String> args) async {
final port = int.tryParse(args.isNotEmpty ? args.first : '9444') ?? 9444;
final router = Router();
router.get('/ws', webSocketHandler((WebSocketChannel ws) {
String? subscribedCode;
ws.stream.listen(
(data) {
try {
final json = jsonDecode(data as String) as Map<String, dynamic>;
final type = json['type'] as String? ?? '';
if (type == 'qrcode_subscribe') {
final code = json['code'] as String? ?? '';
if (code.isNotEmpty) {
subscribedCode = code;
_codeToChannels.putIfAbsent(code, () => []).add(ws);
print('[subscribe] code=$code total=${_codeToChannels[code]?.length}');
ws.sink.add(jsonEncode({
'type': 'qrcode_subscribed',
'code': code,
}));
}
} else if (type == 'qrcode_unsubscribe') {
final code = json['code'] as String? ?? '';
if (code.isNotEmpty) {
_codeToChannels[code]?.remove(ws);
if (_codeToChannels[code]?.isEmpty ?? false) {
_codeToChannels.remove(code);
}
subscribedCode = null;
print('[unsubscribe] code=$code');
}
} else if (type == 'ping') {
ws.sink.add(jsonEncode({'type': 'pong'}));
}
} catch (e) {
print('[error] parse failed: $e');
}
},
onDone: () {
if (subscribedCode != null) {
_codeToChannels[subscribedCode]?.remove(ws);
if (_codeToChannels[subscribedCode]?.isEmpty ?? false) {
_codeToChannels.remove(subscribedCode);
}
print('[disconnect] code=$subscribedCode');
}
},
onError: (e) {
print('[error] $e');
if (subscribedCode != null) {
_codeToChannels[subscribedCode]?.remove(ws);
}
},
);
}));
router.post('/notify', (shelf.Request request) async {
try {
final body = await request.readAsString();
final json = jsonDecode(body) as Map<String, dynamic>;
final code = json['code'] as String? ?? '';
final status = json['status'] as String? ?? '';
final token = json['token'] as String?;
if (code.isEmpty || status.isEmpty) {
return shelf.Response(400, body: jsonEncode({'error': 'code and status required'}));
}
final channels = _codeToChannels[code];
if (channels == null || channels.isEmpty) {
return shelf.Response.ok(jsonEncode({'sent': 0, 'message': 'no subscribers'}));
}
final message = jsonEncode({
'type': 'qrcode_status_update',
'code': code,
'status': status,
if (token != null) 'token': token,
'ts': DateTime.now().millisecondsSinceEpoch,
});
var sent = 0;
final toRemove = <WebSocketChannel>[];
for (final ch in channels) {
try {
ch.sink.add(message);
sent++;
} catch (e) {
toRemove.add(ch);
}
}
for (final ch in toRemove) {
channels.remove(ch);
}
print('[notify] code=$code status=$status sent=$sent');
return shelf.Response.ok(jsonEncode({'sent': sent}));
} catch (e) {
return shelf.Response(500, body: jsonEncode({'error': e.toString()}));
}
});
router.get('/stats', (shelf.Request request) {
final stats = <String, int>{};
for (final entry in _codeToChannels.entries) {
stats[entry.key] = entry.value.length;
}
return shelf.Response.ok(jsonEncode({
'total_codes': _codeToChannels.length,
'subscribers': stats,
}));
});
final handler = const shelf.Pipeline()
.addMiddleware(shelf.logRequests())
.addHandler(router.call);
final server = await shelf_io.serve(handler, InternetAddress.anyIPv4, port);
print('🚀 QR Code WebSocket Relay running on ws://0.0.0.0:$port/ws');
print('📡 Notify endpoint: POST http://0.0.0.0:$port/notify');
print('📊 Stats endpoint: GET http://0.0.0.0:$port/stats');
}
// WebSocketChannel stub for standalone server
// When running as standalone, import web_socket_channel directly
class WebSocketChannel {
final Stream<dynamic> stream;
final WebSocketSink sink;
WebSocketChannel(this.stream, this.sink);
}
class WebSocketSink {
final Function(String) _add;
final Function() _close;
WebSocketSink(this._add, this._close);
void add(String data) => _add(data);
void close() => _close();
}

View File

@@ -0,0 +1,255 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
* @time 2026-06-02
* @name test_qrcode_login.py
* @description 扫码登录API完整流程测试脚本
* @lastUpdate v10.3.0 使用现有测试账号; 测试登录→生成二维码→轮询→确认→获取token→取消
"""
import hmac
import hashlib
import base64
import time
import secrets
import json
import sys
try:
import requests
except ImportError:
print("need: pip3 install requests")
sys.exit(1)
BASE_URL = "https://tools.wktyl.com"
SECRET = "Xy7kP9mL2qR4wS8v"
TIMEOUT = 15
TEST_ACCOUNT = "123456"
TEST_PASSWORD = "123456"
PASS_COUNT = 0
FAIL_COUNT = 0
SKIP_COUNT = 0
def generate_receipt(action, payload, secret=SECRET):
ts = int(time.time())
nonce = secrets.token_hex(8)
payload_hash = hashlib.sha256(payload.encode()).hexdigest()[:16]
data = base64.b64encode(json.dumps({
'action': action,
'payload': payload_hash,
'ts': ts,
'nonce': nonce
}).encode()).decode()
sig = hmac.new(secret.encode(), data.encode(), hashlib.sha256).hexdigest()
return data, sig
def api_call(method, path, data=None, params=None, token=None):
url = f"{BASE_URL}{path}"
headers = {}
if token:
headers['token'] = token
try:
if method == 'GET':
resp = requests.get(url, params=params, headers=headers, timeout=TIMEOUT)
else:
resp = requests.post(url, data=data, headers=headers, timeout=TIMEOUT)
return resp.json()
except requests.exceptions.Timeout:
return {'code': -2, 'msg': 'timeout'}
except Exception as e:
return {'code': -1, 'msg': str(e)}
def assert_test(condition, test_name, detail=""):
global PASS_COUNT, FAIL_COUNT
if condition:
PASS_COUNT += 1
print(f"{test_name}")
else:
FAIL_COUNT += 1
print(f"{test_name} {detail}")
def skip_test(test_name, reason=""):
global SKIP_COUNT
SKIP_COUNT += 1
print(f" ⏭️ {test_name} {reason}")
def test_qrcode_login_flow():
print("\n" + "=" * 60)
print("🧪 扫码登录API完整流程测试")
print("=" * 60)
user_token = None
user_id = None
qr_code = None
poll_token = None
# ─── Step 1: 登录测试账号 ───
print("\n📌 Step 1: 登录测试账号")
result = api_call('POST', '/api/user_security/login', data={
'account': TEST_ACCOUNT,
'password': TEST_PASSWORD,
})
assert_test(result.get('code') == 1, "账号密码登录",
f"code={result.get('code')}, msg={result.get('msg')}")
if result.get('code') != 1:
print(f" ⚠️ 登录失败,跳过后续测试: {result.get('msg')}")
return
user_token = result.get('data', {}).get('token', '')
user_info = result.get('data', {}).get('userinfo', {})
user_id = user_info.get('id', '')
print(f" 📋 用户ID: {user_id}, Token: {user_token[:20]}...")
# ─── Step 2: 生成二维码 ───
print("\n📌 Step 2: 生成二维码")
result = api_call('GET', '/api/user_security/qrcodeGenerate')
assert_test(result.get('code') == 1, "生成二维码",
f"code={result.get('code')}, msg={result.get('msg')}")
if result.get('code') == 1:
qr_data = result.get('data', {})
qr_code = qr_data.get('code', '')
expire_time = qr_data.get('expire_time', 0)
expire_seconds = qr_data.get('expire_seconds', 0)
qrcode_url = qr_data.get('qrcode_url', '')
print(f" 📋 Code: {qr_code[:20]}...")
print(f" 📋 过期时间: {expire_time}, 有效期: {expire_seconds}")
print(f" 📋 二维码URL: {qrcode_url[:50]}...")
assert_test(len(qr_code) == 32, "Code长度为32位hex", f"实际长度: {len(qr_code)}")
assert_test(expire_seconds == 300, "有效期为300秒", f"实际: {expire_seconds}")
else:
print(f" ⚠️ 生成二维码失败,跳过后续测试")
return
# ─── Step 3: 轮询二维码状态应为pending ───
print("\n📌 Step 3: 轮询二维码状态应为pending")
result = api_call('GET', '/api/user_security/qrcodePoll', params={'code': qr_code})
assert_test(result.get('code') == 1, "轮询二维码状态",
f"code={result.get('code')}, msg={result.get('msg')}")
if result.get('code') == 1:
poll_status = result.get('data', {}).get('status', '')
assert_test(poll_status == 'pending', "状态为pending", f"实际: {poll_status}")
# ─── Step 4: 用登录的token确认扫码 ───
print("\n📌 Step 4: 用登录的token确认扫码")
result = api_call('POST', '/api/user_security/qrcodeConfirm', data={
'code': qr_code,
'platform': 'test_script',
'device_name': 'Test Device',
'app_name': 'test_qrcode_login',
}, token=user_token)
assert_test(result.get('code') == 1, "确认扫码",
f"code={result.get('code')}, msg={result.get('msg')}")
# ─── Step 5: 轮询二维码状态应为confirmed获取新token ───
print("\n📌 Step 5: 轮询二维码状态应为confirmed")
result = api_call('GET', '/api/user_security/qrcodePoll', params={'code': qr_code})
assert_test(result.get('code') == 1, "轮询confirmed状态",
f"code={result.get('code')}, msg={result.get('msg')}")
if result.get('code') == 1:
poll_status = result.get('data', {}).get('status', '')
poll_token = result.get('data', {}).get('token', '')
poll_userinfo = result.get('data', {}).get('userinfo', {})
assert_test(poll_status == 'confirmed', "状态为confirmed", f"实际: {poll_status}")
assert_test(bool(poll_token), "返回Token", "token为空")
assert_test(bool(poll_userinfo), "返回用户信息", "userinfo为空")
if poll_token:
print(f" 📋 新Token: {poll_token[:20]}...")
print(f" 📋 用户: {poll_userinfo.get('username', 'N/A')}")
# ─── Step 6: 使用新token登录 ───
print("\n📌 Step 6: 使用新token登录")
if poll_token:
result = api_call('POST', '/api/user_security/tokenLogin', data={
'token': poll_token,
})
assert_test(result.get('code') == 1, "使用新Token登录",
f"code={result.get('code')}, msg={result.get('msg')}")
if result.get('code') == 1:
login_userinfo = result.get('data', {}).get('userinfo', {})
assert_test(login_userinfo.get('id') == user_id, "用户ID一致",
f"期望: {user_id}, 实际: {login_userinfo.get('id')}")
else:
skip_test("使用新Token登录", "无Token可用")
# ─── Step 7: 测试取消二维码 ───
print("\n📌 Step 7: 测试取消二维码")
result = api_call('GET', '/api/user_security/qrcodeGenerate')
if result.get('code') == 1:
cancel_code = result.get('data', {}).get('code', '')
result = api_call('POST', '/api/user_security/qrcodeCancel', data={'code': cancel_code})
assert_test(result.get('code') == 1, "取消二维码",
f"code={result.get('code')}, msg={result.get('msg')}")
result = api_call('GET', '/api/user_security/qrcodePoll', params={'code': cancel_code})
if result.get('code') == 1:
cancel_status = result.get('data', {}).get('status', '')
assert_test(cancel_status == 'cancelled', "取消后状态为cancelled",
f"实际: {cancel_status}")
else:
skip_test("取消二维码", "无法生成新二维码")
# ─── Step 8: 测试密保问题接口 ───
print("\n📌 Step 8: 测试密保问题接口")
result = api_call('GET', '/api/user_security/secQuestions')
assert_test(result.get('code') == 1, "获取密保问题列表",
f"code={result.get('code')}, msg={result.get('msg')}")
if result.get('code') == 1:
questions = result.get('data', {}).get('questions', [])
assert_test(len(questions) == 8, "密保问题数量为8", f"实际: {len(questions)}")
def test_edge_cases():
print("\n" + "=" * 60)
print("🧪 边界情况测试")
print("=" * 60)
# 无效code轮询
print("\n📌 测试无效code轮询")
result = api_call('GET', '/api/user_security/qrcodePoll', params={'code': 'invalid_code_12345'})
assert_test(result.get('code') == 0, "无效code返回错误",
f"code={result.get('code')}, msg={result.get('msg')}")
# 无效code确认(需登录,返回401)
print("\n📌 测试无效code确认(未登录)")
result = api_call('POST', '/api/user_security/qrcodeConfirm', data={'code': 'invalid_code_12345'})
assert_test(result.get('code') in [-1, 0, 401], "未登录确认返回错误",
f"code={result.get('code')}, msg={result.get('msg')}")
# 空code参数
print("\n📌 测试空code参数")
result = api_call('GET', '/api/user_security/qrcodePoll', params={'code': ''})
assert_test(result.get('code') == 0, "空code返回错误",
f"code={result.get('code')}, msg={result.get('msg')}")
if __name__ == '__main__':
print("🚀 闲言工具箱 - 扫码登录API测试")
print(f"📅 时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"🌐 基础URL: {BASE_URL}")
test_qrcode_login_flow()
test_edge_cases()
print("\n" + "=" * 60)
print("📊 测试结果汇总")
print("=" * 60)
total = PASS_COUNT + FAIL_COUNT + SKIP_COUNT
print(f" ✅ 通过: {PASS_COUNT}")
print(f" ❌ 失败: {FAIL_COUNT}")
print(f" ⏭️ 跳过: {SKIP_COUNT}")
print(f" 📋 总计: {total}")
print(f" 📈 通过率: {PASS_COUNT/total*100:.1f}%" if total > 0 else " 📈 无测试")
if FAIL_COUNT > 0:
print("\n⚠️ 存在失败测试,请检查!")
sys.exit(1)
else:
print("\n🎉 所有测试通过!")
sys.exit(0)