Files
xianyan/docs/toolsapi/scripts/test_cross_network_pairing.py
Developer 6f5400ec4b feat: 5.4.0版本大更新,新增多端桌面小组件与多项功能优化
- 重构「灵感」模块为「发现」模块,统一页面命名与文案
- 新增flutter_tts语音朗读依赖与鸿蒙Nearby配对方式
- 添加Android/iOS/鸿蒙全平台桌面小组件支持(7种类型)
- 完善文件传输模块,新增画布邀请消息与删除会话功能
- 优化协作画布光标广播节流逻辑,修复已知bug
- 更新应用英文名与隐私政策入口,新增翻译API抽象层
- 移除用户中心多余的加号按钮,完善空状态组件类型
2026-05-19 05:39:50 +08:00

348 lines
14 KiB
Python

"""
闲言APP - 跨网配对全流程测试脚本
创建时间: 2026-05-19
作用: 测试配对码/雷达扫描/QR配对信令
每接口3-5个测试场景
"""
import asyncio
import json
import time
import sys
import random
import string
WS_URL = 'wss://tools.wktyl.com:9443'
def generate_pairing_code_chars():
return '23456789ABCDEFGHJKMNPQRSTUVWXYZ'
async def ws_connect():
import websockets
return await websockets.connect(WS_URL, ping_interval=None)
async def register_device(ws, device_id, alias='TestDevice', fingerprint=None, user_id=None, ip_city=None, ip_range=None):
msg = {
'type': 'register',
'data': {
'deviceId': device_id,
'alias': alias,
'deviceType': 'mobile',
'protocol': 'xianyan-v1'
}
}
if fingerprint:
msg['data']['fingerprint'] = fingerprint
if user_id:
msg['data']['userId'] = user_id
if ip_city:
msg['data']['ipCity'] = ip_city
if ip_range:
msg['data']['ipRange'] = ip_range
await ws.send(json.dumps(msg))
for _ in range(10):
response = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
if response.get('type') == 'registered':
return response
return response
async def send_heartbeat(ws):
await ws.send(json.dumps({'type': 'heartbeat', 'data': {'timestamp': int(time.time() * 1000)}}))
response = await asyncio.wait_for(ws.recv(), timeout=10)
return json.loads(response)
# ===== 配对码测试 =====
async def test_pairing_code_create_normal():
"""场景1: 正常创建配对码"""
ws = await ws_connect()
reg = await register_device(ws, f'dev-pair-create-{int(time.time())}', alias='DeviceA', fingerprint=f'fp_create_{int(time.time())}')
assert reg.get('type') == 'registered', f'Register failed: {reg}'
await ws.send(json.dumps({
'type': 'pairing-code-create',
'data': {'alias': 'DeviceA', 'deviceType': 'mobile'}
}))
response = None
for _ in range(5):
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
if msg.get('type') == 'pairing-code-created':
response = msg
break
assert response is not None, f'No pairing-code-created response received'
assert 'pairingCode' in response, f'Missing pairingCode in response: {response}'
assert len(response['pairingCode']) == 6, f'Pairing code length != 6: {response["pairingCode"]}'
assert 'expiresAt' in response, f'Missing expiresAt in response'
valid_chars = set(generate_pairing_code_chars())
for c in response['pairingCode']:
assert c in valid_chars, f'Invalid char in pairing code: {c}'
print(f' [PASS] test_pairing_code_create_normal: code={response["pairingCode"]}')
await ws.close()
async def test_pairing_code_create_duplicate():
"""场景2: 重复创建覆盖旧码"""
ws = await ws_connect()
reg = await register_device(ws, f'dev-pair-dup-{int(time.time())}', fingerprint=f'fp_dup_{int(time.time())}')
await ws.send(json.dumps({'type': 'pairing-code-create', 'data': {'alias': 'DeviceDup'}}))
code1 = None
for _ in range(5):
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
if msg.get('type') == 'pairing-code-created':
code1 = msg['pairingCode']
break
await ws.send(json.dumps({'type': 'pairing-code-create', 'data': {'alias': 'DeviceDup'}}))
code2 = None
for _ in range(5):
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
if msg.get('type') == 'pairing-code-created':
code2 = msg['pairingCode']
break
assert code1 is not None and code2 is not None, 'Failed to create codes'
assert code1 != code2, f'Duplicate create should generate different codes: {code1} vs {code2}'
print(f' [PASS] test_pairing_code_create_duplicate: {code1} -> {code2}')
await ws.close()
async def test_pairing_code_join_normal():
"""场景3: 正常配对码匹配"""
ws_a = await ws_connect()
ws_b = await ws_connect()
reg_a = await register_device(ws_a, f'dev-pair-join-a-{int(time.time())}', alias='DeviceA', fingerprint=f'fp_join_a_{int(time.time())}')
reg_b = await register_device(ws_b, f'dev-pair-join-b-{int(time.time())}', alias='DeviceB', fingerprint=f'fp_join_b_{int(time.time())}')
await ws_a.send(json.dumps({'type': 'pairing-code-create', 'data': {'alias': 'DeviceA'}}))
code = None
for _ in range(5):
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
if msg.get('type') == 'pairing-code-created':
code = msg['pairingCode']
break
assert code is not None, 'Failed to create pairing code'
await ws_b.send(json.dumps({'type': 'pairing-code-join', 'data': {'pairingCode': code}}))
matched_b = None
for _ in range(5):
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
if msg.get('type') == 'pairing-matched':
matched_b = msg
break
matched_a = None
for _ in range(5):
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
if msg.get('type') == 'pairing-matched':
matched_a = msg
break
assert matched_b is not None and matched_b.get('success') == True, f'DeviceB match failed: {matched_b}'
assert matched_a is not None and matched_a.get('success') == True, f'DeviceA match failed: {matched_a}'
assert 'peer' in matched_b, f'Missing peer info in matched response'
print(f' [PASS] test_pairing_code_join_normal: code={code}')
await ws_a.close()
await ws_b.close()
async def test_pairing_code_join_wrong_code():
"""场景4: 错误配对码"""
ws = await ws_connect()
reg = await register_device(ws, f'dev-pair-wrong-{int(time.time())}', fingerprint=f'fp_wrong_{int(time.time())}')
await ws.send(json.dumps({'type': 'pairing-code-join', 'data': {'pairingCode': 'ZZZZZZ'}}))
response = None
for _ in range(5):
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
if msg.get('type') == 'pairing-matched':
response = msg
break
assert response is not None, 'No response for wrong code'
assert response.get('success') == False, f'Wrong code should fail: {response}'
print(f' [PASS] test_pairing_code_join_wrong_code')
await ws.close()
async def test_pairing_code_join_self():
"""场景5: 使用自己创建的配对码"""
ws = await ws_connect()
reg = await register_device(ws, f'dev-pair-self-{int(time.time())}', fingerprint=f'fp_self_{int(time.time())}')
await ws.send(json.dumps({'type': 'pairing-code-create', 'data': {'alias': 'SelfDevice'}}))
code = None
for _ in range(5):
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
if msg.get('type') == 'pairing-code-created':
code = msg['pairingCode']
break
await ws.send(json.dumps({'type': 'pairing-code-join', 'data': {'pairingCode': code}}))
response = None
for _ in range(5):
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
if msg.get('type') == 'pairing-matched':
response = msg
break
assert response is not None and response.get('success') == False, f'Self-join should fail: {response}'
print(f' [PASS] test_pairing_code_join_self')
await ws.close()
# ===== 雷达扫描测试 =====
async def test_radar_broadcast_normal():
"""场景1: 正常雷达广播"""
ws = await ws_connect()
reg = await register_device(ws, f'dev-radar-bc-{int(time.time())}', alias='RadarDevice', fingerprint=f'fp_radar_{int(time.time())}', ip_city='北京', ip_range='110.123.45.0/24')
await ws.send(json.dumps({
'type': 'radar-broadcast',
'data': {'alias': 'RadarDevice', 'ipCity': '北京', 'ipRange': '110.123.45.0/24'}
}))
response = None
for _ in range(5):
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
if msg.get('type') == 'radar-broadcast':
response = msg
break
assert response is not None and response.get('success') == True, f'Radar broadcast failed: {response}'
print(f' [PASS] test_radar_broadcast_normal')
await ws.close()
async def test_radar_scan_same_network():
"""场景2: 同IP段扫描"""
ws_a = await ws_connect()
ws_b = await ws_connect()
await register_device(ws_a, f'dev-radar-a-{int(time.time())}', alias='RadarA', fingerprint=f'fp_radar_a_{int(time.time())}', ip_city='北京', ip_range='110.123.45.0/24')
await register_device(ws_b, f'dev-radar-b-{int(time.time())}', alias='RadarB', fingerprint=f'fp_radar_b_{int(time.time())}', ip_city='北京', ip_range='110.123.45.0/24')
await ws_a.send(json.dumps({'type': 'radar-broadcast', 'data': {'alias': 'RadarA', 'ipCity': '北京', 'ipRange': '110.123.45.0/24'}}))
await ws_b.send(json.dumps({'type': 'radar-broadcast', 'data': {'alias': 'RadarB', 'ipCity': '北京', 'ipRange': '110.123.45.0/24'}}))
for _ in range(3):
await asyncio.wait_for(ws_a.recv(), timeout=10)
await asyncio.wait_for(ws_b.recv(), timeout=10)
await ws_a.send(json.dumps({'type': 'radar-scan', 'data': {'ipCity': '北京', 'ipRange': '110.123.45.0/24'}}))
response = None
for _ in range(5):
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
if msg.get('type') == 'radar-devices':
response = msg
break
assert response is not None, 'No radar-devices response'
devices = response.get('devices', [])
assert len(devices) > 0, f'No devices found in same network scan'
same_net = [d for d in devices if d.get('matchType') == 'same-network']
assert len(same_net) > 0, f'No same-network devices found: {devices}'
print(f' [PASS] test_radar_scan_same_network: found {len(same_net)} same-network device(s)')
await ws_a.close()
await ws_b.close()
async def test_radar_scan_no_match():
"""场景3: 无匹配设备"""
ws = await ws_connect()
await register_device(ws, f'dev-radar-nomatch-{int(time.time())}', alias='LonelyDevice', fingerprint=f'fp_lonely_{int(time.time())}')
await ws.send(json.dumps({'type': 'radar-scan', 'data': {'ipCity': '火星', 'ipRange': '0.0.0.0/24'}}))
response = None
for _ in range(5):
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
if msg.get('type') == 'radar-devices':
response = msg
break
assert response is not None, 'No radar-devices response'
devices = response.get('devices', [])
same_network = [d for d in devices if d.get('matchType') == 'same-network']
same_city = [d for d in devices if d.get('matchType') == 'same-city']
assert len(same_network) == 0 and len(same_city) == 0, f'Should find no same-network/city devices: {response}'
print(f' [PASS] test_radar_scan_no_match (found {len(devices)} remote devices, 0 local)')
await ws.close()
async def test_radar_broadcast_update():
"""场景4: 重复广播更新"""
ws = await ws_connect()
await register_device(ws, f'dev-radar-update-{int(time.time())}', alias='UpdateDevice', fingerprint=f'fp_update_{int(time.time())}')
await ws.send(json.dumps({'type': 'radar-broadcast', 'data': {'alias': 'OldName', 'ipCity': '上海'}}))
for _ in range(3):
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
if msg.get('type') == 'radar-broadcast':
break
await ws.send(json.dumps({'type': 'radar-broadcast', 'data': {'alias': 'NewName', 'ipCity': '北京'}}))
response = None
for _ in range(3):
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
if msg.get('type') == 'radar-broadcast':
response = msg
break
assert response is not None and response.get('success') == True, f'Update broadcast failed: {response}'
print(f' [PASS] test_radar_broadcast_update')
await ws.close()
async def test_radar_scan_my_device():
"""场景5: 同userId设备发现"""
ws_a = await ws_connect()
ws_b = await ws_connect()
uid = f'shared-user-{int(time.time())}'
await register_device(ws_a, f'dev-radar-ua-{int(time.time())}', alias='MyPhone', fingerprint=f'fp_ua_{int(time.time())}', user_id=uid, ip_city='北京', ip_range='110.123.45.0/24')
await register_device(ws_b, f'dev-radar-ub-{int(time.time())}', alias='MyPad', fingerprint=f'fp_ub_{int(time.time())}', user_id=uid, ip_city='上海', ip_range='114.114.0.0/24')
await ws_a.send(json.dumps({'type': 'radar-broadcast', 'data': {'alias': 'MyPhone', 'ipCity': '北京', 'ipRange': '110.123.45.0/24'}}))
await ws_b.send(json.dumps({'type': 'radar-broadcast', 'data': {'alias': 'MyPad', 'ipCity': '上海', 'ipRange': '114.114.0.0/24'}}))
for _ in range(3):
await asyncio.wait_for(ws_a.recv(), timeout=10)
await asyncio.wait_for(ws_b.recv(), timeout=10)
await ws_a.send(json.dumps({'type': 'radar-scan', 'data': {'ipCity': '北京', 'ipRange': '110.123.45.0/24'}}))
response = None
for _ in range(5):
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
if msg.get('type') == 'radar-devices':
response = msg
break
assert response is not None, 'No radar-devices response'
my_devices = [d for d in response.get('devices', []) if d.get('matchType') == 'my-device']
assert len(my_devices) > 0, f'Should find my-device: {response}'
print(f' [PASS] test_radar_scan_my_device: found {len(my_devices)} my-device(s)')
await ws_a.close()
await ws_b.close()
# ===== 主函数 =====
async def run_all_tests():
print('\n===== 配对码测试 =====')
await test_pairing_code_create_normal()
await test_pairing_code_create_duplicate()
await test_pairing_code_join_normal()
await test_pairing_code_join_wrong_code()
await test_pairing_code_join_self()
print('\n===== 雷达扫描测试 =====')
await test_radar_broadcast_normal()
await test_radar_scan_same_network()
await test_radar_scan_no_match()
await test_radar_broadcast_update()
await test_radar_scan_my_device()
print('\n===== 全部测试通过! =====')
if __name__ == '__main__':
asyncio.run(run_all_tests())