feat: 5.4.0版本大更新,新增多端桌面小组件与多项功能优化
- 重构「灵感」模块为「发现」模块,统一页面命名与文案 - 新增flutter_tts语音朗读依赖与鸿蒙Nearby配对方式 - 添加Android/iOS/鸿蒙全平台桌面小组件支持(7种类型) - 完善文件传输模块,新增画布邀请消息与删除会话功能 - 优化协作画布光标广播节流逻辑,修复已知bug - 更新应用英文名与隐私政策入口,新增翻译API抽象层 - 移除用户中心多余的加号按钮,完善空状态组件类型
This commit is contained in:
251
docs/toolsapi/scripts/test_canvas_sync.py
Normal file
251
docs/toolsapi/scripts/test_canvas_sync.py
Normal file
@@ -0,0 +1,251 @@
|
||||
"""
|
||||
闲言APP - 画布同步测试脚本
|
||||
创建时间: 2026-05-19
|
||||
作用: 测试画布笔画同步/光标/快照/加入离开
|
||||
每接口3-5个测试场景
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
|
||||
WS_URL = 'wss://tools.wktyl.com:9443'
|
||||
|
||||
async def ws_connect():
|
||||
import websockets
|
||||
return await websockets.connect(WS_URL, ping_interval=None)
|
||||
|
||||
async def register_device(ws, device_id, alias='TestDevice', fingerprint=None):
|
||||
msg = {
|
||||
'type': 'register',
|
||||
'data': {'deviceId': device_id, 'alias': alias, 'deviceType': 'mobile', 'protocol': 'xianyan-v1'}
|
||||
}
|
||||
if fingerprint:
|
||||
msg['data']['fingerprint'] = fingerprint
|
||||
await ws.send(json.dumps(msg))
|
||||
for _ in range(10):
|
||||
response = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
|
||||
if response.get('type') == 'registered':
|
||||
return response
|
||||
return response
|
||||
|
||||
async def test_canvas_join_and_stroke():
|
||||
"""场景1: 加入画布+笔画同步"""
|
||||
ws_a = await ws_connect()
|
||||
ws_b = await ws_connect()
|
||||
|
||||
fp_a = f'fp_canvas_a_{int(time.time())}'
|
||||
fp_b = f'fp_canvas_b_{int(time.time())}'
|
||||
await register_device(ws_a, f'dev-canvas-a-{int(time.time())}', alias='DrawerA', fingerprint=fp_a)
|
||||
await register_device(ws_b, f'dev-canvas-b-{int(time.time())}', alias='DrawerB', fingerprint=fp_b)
|
||||
|
||||
canvas_id = f'canvas-test-{int(time.time())}'
|
||||
|
||||
await ws_a.send(json.dumps({'type': 'canvas-join', 'payload': {'canvasId': canvas_id}}))
|
||||
for _ in range(3):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'canvas-join':
|
||||
break
|
||||
|
||||
await ws_b.send(json.dumps({'type': 'canvas-join', 'payload': {'canvasId': canvas_id}}))
|
||||
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'canvas-join':
|
||||
break
|
||||
|
||||
await ws_a.send(json.dumps({
|
||||
'type': 'canvas-stroke',
|
||||
'payload': {
|
||||
'canvasId': canvas_id,
|
||||
'stroke': {'strokeId': 's1', 'points': [[10, 20], [30, 40]], 'color': '#FF0000', 'width': 2.0, 'tool': 'pen'}
|
||||
}
|
||||
}))
|
||||
|
||||
received = False
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
|
||||
if msg.get('type') == 'canvas-stroke':
|
||||
received = True
|
||||
assert msg['payload']['stroke']['strokeId'] == 's1', f'Wrong stroke: {msg}'
|
||||
break
|
||||
|
||||
assert received, 'canvas-stroke not received by peer'
|
||||
print(f' [PASS] test_canvas_join_and_stroke')
|
||||
await ws_a.close()
|
||||
await ws_b.close()
|
||||
|
||||
async def test_canvas_cursor():
|
||||
"""场景2: 远程光标同步"""
|
||||
ws_a = await ws_connect()
|
||||
ws_b = await ws_connect()
|
||||
|
||||
fp_a = f'fp_cursor_a_{int(time.time())}'
|
||||
fp_b = f'fp_cursor_b_{int(time.time())}'
|
||||
await register_device(ws_a, f'dev-cursor-a-{int(time.time())}', alias='CursorA', fingerprint=fp_a)
|
||||
await register_device(ws_b, f'dev-cursor-b-{int(time.time())}', alias='CursorB', fingerprint=fp_b)
|
||||
|
||||
canvas_id = f'canvas-cursor-{int(time.time())}'
|
||||
await ws_a.send(json.dumps({'type': 'canvas-join', 'payload': {'canvasId': canvas_id}}))
|
||||
for _ in range(3):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'canvas-join':
|
||||
break
|
||||
|
||||
await ws_b.send(json.dumps({'type': 'canvas-join', 'payload': {'canvasId': canvas_id}}))
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'canvas-join':
|
||||
break
|
||||
|
||||
await ws_a.send(json.dumps({
|
||||
'type': 'canvas-cursor',
|
||||
'payload': {'canvasId': canvas_id, 'x': 100, 'y': 200}
|
||||
}))
|
||||
|
||||
received = False
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
|
||||
if msg.get('type') == 'canvas-cursor':
|
||||
received = True
|
||||
break
|
||||
|
||||
assert received, 'canvas-cursor not received'
|
||||
print(f' [PASS] test_canvas_cursor')
|
||||
await ws_a.close()
|
||||
await ws_b.close()
|
||||
|
||||
async def test_canvas_snapshot():
|
||||
"""场景3: 画布快照请求"""
|
||||
ws_a = await ws_connect()
|
||||
ws_b = await ws_connect()
|
||||
|
||||
fp_a = f'fp_snap_a_{int(time.time())}'
|
||||
fp_b = f'fp_snap_b_{int(time.time())}'
|
||||
await register_device(ws_a, f'dev-snap-a-{int(time.time())}', alias='SnapA', fingerprint=fp_a)
|
||||
await register_device(ws_b, f'dev-snap-b-{int(time.time())}', alias='SnapB', fingerprint=fp_b)
|
||||
|
||||
canvas_id = f'canvas-snap-{int(time.time())}'
|
||||
await ws_a.send(json.dumps({'type': 'canvas-join', 'payload': {'canvasId': canvas_id}}))
|
||||
for _ in range(3):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'canvas-join':
|
||||
break
|
||||
|
||||
await ws_b.send(json.dumps({'type': 'canvas-join', 'payload': {'canvasId': canvas_id}}))
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'canvas-join':
|
||||
break
|
||||
|
||||
await ws_a.send(json.dumps({
|
||||
'type': 'canvas-snapshot',
|
||||
'payload': {'canvasId': canvas_id, 'action': 'request'}
|
||||
}))
|
||||
|
||||
received = False
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
|
||||
if msg.get('type') == 'canvas-snapshot' and msg.get('payload', {}).get('action') == 'request':
|
||||
received = True
|
||||
break
|
||||
|
||||
assert received, 'canvas-snapshot request not received'
|
||||
print(f' [PASS] test_canvas_snapshot')
|
||||
await ws_a.close()
|
||||
await ws_b.close()
|
||||
|
||||
async def test_canvas_leave():
|
||||
"""场景4: 离开画布通知"""
|
||||
ws_a = await ws_connect()
|
||||
ws_b = await ws_connect()
|
||||
|
||||
fp_a = f'fp_leave_a_{int(time.time())}'
|
||||
fp_b = f'fp_leave_b_{int(time.time())}'
|
||||
await register_device(ws_a, f'dev-leave-a-{int(time.time())}', alias='LeaveA', fingerprint=fp_a)
|
||||
await register_device(ws_b, f'dev-leave-b-{int(time.time())}', alias='LeaveB', fingerprint=fp_b)
|
||||
|
||||
canvas_id = f'canvas-leave-{int(time.time())}'
|
||||
await ws_a.send(json.dumps({'type': 'canvas-join', 'payload': {'canvasId': canvas_id}}))
|
||||
for _ in range(3):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'canvas-join':
|
||||
break
|
||||
|
||||
await ws_b.send(json.dumps({'type': 'canvas-join', 'payload': {'canvasId': canvas_id}}))
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'canvas-join':
|
||||
break
|
||||
|
||||
await ws_a.send(json.dumps({'type': 'canvas-leave', 'payload': {'canvasId': canvas_id}}))
|
||||
|
||||
received = False
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
|
||||
if msg.get('type') == 'canvas-leave':
|
||||
received = True
|
||||
break
|
||||
|
||||
assert received, 'canvas-leave not received'
|
||||
print(f' [PASS] test_canvas_leave')
|
||||
await ws_a.close()
|
||||
await ws_b.close()
|
||||
|
||||
async def test_canvas_rapid_strokes():
|
||||
"""场景5: 快速连续笔画"""
|
||||
ws_a = await ws_connect()
|
||||
ws_b = await ws_connect()
|
||||
|
||||
fp_a = f'fp_rapid_a_{int(time.time())}'
|
||||
fp_b = f'fp_rapid_b_{int(time.time())}'
|
||||
await register_device(ws_a, f'dev-rapid-a-{int(time.time())}', alias='RapidA', fingerprint=fp_a)
|
||||
await register_device(ws_b, f'dev-rapid-b-{int(time.time())}', alias='RapidB', fingerprint=fp_b)
|
||||
|
||||
canvas_id = f'canvas-rapid-{int(time.time())}'
|
||||
await ws_a.send(json.dumps({'type': 'canvas-join', 'payload': {'canvasId': canvas_id}}))
|
||||
for _ in range(3):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'canvas-join':
|
||||
break
|
||||
|
||||
await ws_b.send(json.dumps({'type': 'canvas-join', 'payload': {'canvasId': canvas_id}}))
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'canvas-join':
|
||||
break
|
||||
|
||||
stroke_count = 10
|
||||
for i in range(stroke_count):
|
||||
await ws_a.send(json.dumps({
|
||||
'type': 'canvas-stroke',
|
||||
'payload': {
|
||||
'canvasId': canvas_id,
|
||||
'stroke': {'strokeId': f'rapid-{i}', 'points': [[i * 10, i * 20]], 'color': '#000000', 'width': 1.0, 'tool': 'pen'}
|
||||
}
|
||||
}))
|
||||
|
||||
received_count = 0
|
||||
try:
|
||||
for _ in range(stroke_count + 5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=5))
|
||||
if msg.get('type') == 'canvas-stroke':
|
||||
received_count += 1
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
|
||||
assert received_count >= stroke_count * 0.8, f'Too many strokes lost: {received_count}/{stroke_count}'
|
||||
print(f' [PASS] test_canvas_rapid_strokes: {received_count}/{stroke_count} received')
|
||||
await ws_a.close()
|
||||
await ws_b.close()
|
||||
|
||||
async def run_all_tests():
|
||||
print('\n===== 画布同步测试 =====')
|
||||
await test_canvas_join_and_stroke()
|
||||
await test_canvas_cursor()
|
||||
await test_canvas_snapshot()
|
||||
await test_canvas_leave()
|
||||
await test_canvas_rapid_strokes()
|
||||
print('\n===== 全部测试通过! =====')
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(run_all_tests())
|
||||
347
docs/toolsapi/scripts/test_cross_network_pairing.py
Normal file
347
docs/toolsapi/scripts/test_cross_network_pairing.py
Normal file
@@ -0,0 +1,347 @@
|
||||
"""
|
||||
闲言APP - 跨网配对全流程测试脚本
|
||||
创建时间: 2026-05-19
|
||||
作用: 测试配对码/雷达扫描/QR配对信令
|
||||
每接口3-5个测试场景
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
import sys
|
||||
import random
|
||||
import string
|
||||
|
||||
WS_URL = 'wss://tools.wktyl.com:9443'
|
||||
|
||||
def generate_pairing_code_chars():
|
||||
return '23456789ABCDEFGHJKMNPQRSTUVWXYZ'
|
||||
|
||||
async def ws_connect():
|
||||
import websockets
|
||||
return await websockets.connect(WS_URL, ping_interval=None)
|
||||
|
||||
async def register_device(ws, device_id, alias='TestDevice', fingerprint=None, user_id=None, ip_city=None, ip_range=None):
|
||||
msg = {
|
||||
'type': 'register',
|
||||
'data': {
|
||||
'deviceId': device_id,
|
||||
'alias': alias,
|
||||
'deviceType': 'mobile',
|
||||
'protocol': 'xianyan-v1'
|
||||
}
|
||||
}
|
||||
if fingerprint:
|
||||
msg['data']['fingerprint'] = fingerprint
|
||||
if user_id:
|
||||
msg['data']['userId'] = user_id
|
||||
if ip_city:
|
||||
msg['data']['ipCity'] = ip_city
|
||||
if ip_range:
|
||||
msg['data']['ipRange'] = ip_range
|
||||
await ws.send(json.dumps(msg))
|
||||
for _ in range(10):
|
||||
response = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
|
||||
if response.get('type') == 'registered':
|
||||
return response
|
||||
return response
|
||||
|
||||
async def send_heartbeat(ws):
|
||||
await ws.send(json.dumps({'type': 'heartbeat', 'data': {'timestamp': int(time.time() * 1000)}}))
|
||||
response = await asyncio.wait_for(ws.recv(), timeout=10)
|
||||
return json.loads(response)
|
||||
|
||||
# ===== 配对码测试 =====
|
||||
|
||||
async def test_pairing_code_create_normal():
|
||||
"""场景1: 正常创建配对码"""
|
||||
ws = await ws_connect()
|
||||
reg = await register_device(ws, f'dev-pair-create-{int(time.time())}', alias='DeviceA', fingerprint=f'fp_create_{int(time.time())}')
|
||||
assert reg.get('type') == 'registered', f'Register failed: {reg}'
|
||||
|
||||
await ws.send(json.dumps({
|
||||
'type': 'pairing-code-create',
|
||||
'data': {'alias': 'DeviceA', 'deviceType': 'mobile'}
|
||||
}))
|
||||
|
||||
response = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
|
||||
if msg.get('type') == 'pairing-code-created':
|
||||
response = msg
|
||||
break
|
||||
|
||||
assert response is not None, f'No pairing-code-created response received'
|
||||
assert 'pairingCode' in response, f'Missing pairingCode in response: {response}'
|
||||
assert len(response['pairingCode']) == 6, f'Pairing code length != 6: {response["pairingCode"]}'
|
||||
assert 'expiresAt' in response, f'Missing expiresAt in response'
|
||||
valid_chars = set(generate_pairing_code_chars())
|
||||
for c in response['pairingCode']:
|
||||
assert c in valid_chars, f'Invalid char in pairing code: {c}'
|
||||
print(f' [PASS] test_pairing_code_create_normal: code={response["pairingCode"]}')
|
||||
await ws.close()
|
||||
|
||||
async def test_pairing_code_create_duplicate():
|
||||
"""场景2: 重复创建覆盖旧码"""
|
||||
ws = await ws_connect()
|
||||
reg = await register_device(ws, f'dev-pair-dup-{int(time.time())}', fingerprint=f'fp_dup_{int(time.time())}')
|
||||
|
||||
await ws.send(json.dumps({'type': 'pairing-code-create', 'data': {'alias': 'DeviceDup'}}))
|
||||
code1 = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
|
||||
if msg.get('type') == 'pairing-code-created':
|
||||
code1 = msg['pairingCode']
|
||||
break
|
||||
|
||||
await ws.send(json.dumps({'type': 'pairing-code-create', 'data': {'alias': 'DeviceDup'}}))
|
||||
code2 = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
|
||||
if msg.get('type') == 'pairing-code-created':
|
||||
code2 = msg['pairingCode']
|
||||
break
|
||||
|
||||
assert code1 is not None and code2 is not None, 'Failed to create codes'
|
||||
assert code1 != code2, f'Duplicate create should generate different codes: {code1} vs {code2}'
|
||||
print(f' [PASS] test_pairing_code_create_duplicate: {code1} -> {code2}')
|
||||
await ws.close()
|
||||
|
||||
async def test_pairing_code_join_normal():
|
||||
"""场景3: 正常配对码匹配"""
|
||||
ws_a = await ws_connect()
|
||||
ws_b = await ws_connect()
|
||||
|
||||
reg_a = await register_device(ws_a, f'dev-pair-join-a-{int(time.time())}', alias='DeviceA', fingerprint=f'fp_join_a_{int(time.time())}')
|
||||
reg_b = await register_device(ws_b, f'dev-pair-join-b-{int(time.time())}', alias='DeviceB', fingerprint=f'fp_join_b_{int(time.time())}')
|
||||
|
||||
await ws_a.send(json.dumps({'type': 'pairing-code-create', 'data': {'alias': 'DeviceA'}}))
|
||||
code = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'pairing-code-created':
|
||||
code = msg['pairingCode']
|
||||
break
|
||||
|
||||
assert code is not None, 'Failed to create pairing code'
|
||||
|
||||
await ws_b.send(json.dumps({'type': 'pairing-code-join', 'data': {'pairingCode': code}}))
|
||||
|
||||
matched_b = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
|
||||
if msg.get('type') == 'pairing-matched':
|
||||
matched_b = msg
|
||||
break
|
||||
|
||||
matched_a = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'pairing-matched':
|
||||
matched_a = msg
|
||||
break
|
||||
|
||||
assert matched_b is not None and matched_b.get('success') == True, f'DeviceB match failed: {matched_b}'
|
||||
assert matched_a is not None and matched_a.get('success') == True, f'DeviceA match failed: {matched_a}'
|
||||
assert 'peer' in matched_b, f'Missing peer info in matched response'
|
||||
print(f' [PASS] test_pairing_code_join_normal: code={code}')
|
||||
await ws_a.close()
|
||||
await ws_b.close()
|
||||
|
||||
async def test_pairing_code_join_wrong_code():
|
||||
"""场景4: 错误配对码"""
|
||||
ws = await ws_connect()
|
||||
reg = await register_device(ws, f'dev-pair-wrong-{int(time.time())}', fingerprint=f'fp_wrong_{int(time.time())}')
|
||||
|
||||
await ws.send(json.dumps({'type': 'pairing-code-join', 'data': {'pairingCode': 'ZZZZZZ'}}))
|
||||
|
||||
response = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
|
||||
if msg.get('type') == 'pairing-matched':
|
||||
response = msg
|
||||
break
|
||||
|
||||
assert response is not None, 'No response for wrong code'
|
||||
assert response.get('success') == False, f'Wrong code should fail: {response}'
|
||||
print(f' [PASS] test_pairing_code_join_wrong_code')
|
||||
await ws.close()
|
||||
|
||||
async def test_pairing_code_join_self():
|
||||
"""场景5: 使用自己创建的配对码"""
|
||||
ws = await ws_connect()
|
||||
reg = await register_device(ws, f'dev-pair-self-{int(time.time())}', fingerprint=f'fp_self_{int(time.time())}')
|
||||
|
||||
await ws.send(json.dumps({'type': 'pairing-code-create', 'data': {'alias': 'SelfDevice'}}))
|
||||
code = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
|
||||
if msg.get('type') == 'pairing-code-created':
|
||||
code = msg['pairingCode']
|
||||
break
|
||||
|
||||
await ws.send(json.dumps({'type': 'pairing-code-join', 'data': {'pairingCode': code}}))
|
||||
response = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
|
||||
if msg.get('type') == 'pairing-matched':
|
||||
response = msg
|
||||
break
|
||||
|
||||
assert response is not None and response.get('success') == False, f'Self-join should fail: {response}'
|
||||
print(f' [PASS] test_pairing_code_join_self')
|
||||
await ws.close()
|
||||
|
||||
# ===== 雷达扫描测试 =====
|
||||
|
||||
async def test_radar_broadcast_normal():
|
||||
"""场景1: 正常雷达广播"""
|
||||
ws = await ws_connect()
|
||||
reg = await register_device(ws, f'dev-radar-bc-{int(time.time())}', alias='RadarDevice', fingerprint=f'fp_radar_{int(time.time())}', ip_city='北京', ip_range='110.123.45.0/24')
|
||||
|
||||
await ws.send(json.dumps({
|
||||
'type': 'radar-broadcast',
|
||||
'data': {'alias': 'RadarDevice', 'ipCity': '北京', 'ipRange': '110.123.45.0/24'}
|
||||
}))
|
||||
|
||||
response = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
|
||||
if msg.get('type') == 'radar-broadcast':
|
||||
response = msg
|
||||
break
|
||||
|
||||
assert response is not None and response.get('success') == True, f'Radar broadcast failed: {response}'
|
||||
print(f' [PASS] test_radar_broadcast_normal')
|
||||
await ws.close()
|
||||
|
||||
async def test_radar_scan_same_network():
|
||||
"""场景2: 同IP段扫描"""
|
||||
ws_a = await ws_connect()
|
||||
ws_b = await ws_connect()
|
||||
|
||||
await register_device(ws_a, f'dev-radar-a-{int(time.time())}', alias='RadarA', fingerprint=f'fp_radar_a_{int(time.time())}', ip_city='北京', ip_range='110.123.45.0/24')
|
||||
await register_device(ws_b, f'dev-radar-b-{int(time.time())}', alias='RadarB', fingerprint=f'fp_radar_b_{int(time.time())}', ip_city='北京', ip_range='110.123.45.0/24')
|
||||
|
||||
await ws_a.send(json.dumps({'type': 'radar-broadcast', 'data': {'alias': 'RadarA', 'ipCity': '北京', 'ipRange': '110.123.45.0/24'}}))
|
||||
await ws_b.send(json.dumps({'type': 'radar-broadcast', 'data': {'alias': 'RadarB', 'ipCity': '北京', 'ipRange': '110.123.45.0/24'}}))
|
||||
|
||||
for _ in range(3):
|
||||
await asyncio.wait_for(ws_a.recv(), timeout=10)
|
||||
await asyncio.wait_for(ws_b.recv(), timeout=10)
|
||||
|
||||
await ws_a.send(json.dumps({'type': 'radar-scan', 'data': {'ipCity': '北京', 'ipRange': '110.123.45.0/24'}}))
|
||||
|
||||
response = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'radar-devices':
|
||||
response = msg
|
||||
break
|
||||
|
||||
assert response is not None, 'No radar-devices response'
|
||||
devices = response.get('devices', [])
|
||||
assert len(devices) > 0, f'No devices found in same network scan'
|
||||
same_net = [d for d in devices if d.get('matchType') == 'same-network']
|
||||
assert len(same_net) > 0, f'No same-network devices found: {devices}'
|
||||
print(f' [PASS] test_radar_scan_same_network: found {len(same_net)} same-network device(s)')
|
||||
await ws_a.close()
|
||||
await ws_b.close()
|
||||
|
||||
async def test_radar_scan_no_match():
|
||||
"""场景3: 无匹配设备"""
|
||||
ws = await ws_connect()
|
||||
await register_device(ws, f'dev-radar-nomatch-{int(time.time())}', alias='LonelyDevice', fingerprint=f'fp_lonely_{int(time.time())}')
|
||||
|
||||
await ws.send(json.dumps({'type': 'radar-scan', 'data': {'ipCity': '火星', 'ipRange': '0.0.0.0/24'}}))
|
||||
|
||||
response = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
|
||||
if msg.get('type') == 'radar-devices':
|
||||
response = msg
|
||||
break
|
||||
|
||||
assert response is not None, 'No radar-devices response'
|
||||
devices = response.get('devices', [])
|
||||
same_network = [d for d in devices if d.get('matchType') == 'same-network']
|
||||
same_city = [d for d in devices if d.get('matchType') == 'same-city']
|
||||
assert len(same_network) == 0 and len(same_city) == 0, f'Should find no same-network/city devices: {response}'
|
||||
print(f' [PASS] test_radar_scan_no_match (found {len(devices)} remote devices, 0 local)')
|
||||
await ws.close()
|
||||
|
||||
async def test_radar_broadcast_update():
|
||||
"""场景4: 重复广播更新"""
|
||||
ws = await ws_connect()
|
||||
await register_device(ws, f'dev-radar-update-{int(time.time())}', alias='UpdateDevice', fingerprint=f'fp_update_{int(time.time())}')
|
||||
|
||||
await ws.send(json.dumps({'type': 'radar-broadcast', 'data': {'alias': 'OldName', 'ipCity': '上海'}}))
|
||||
for _ in range(3):
|
||||
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
|
||||
if msg.get('type') == 'radar-broadcast':
|
||||
break
|
||||
|
||||
await ws.send(json.dumps({'type': 'radar-broadcast', 'data': {'alias': 'NewName', 'ipCity': '北京'}}))
|
||||
response = None
|
||||
for _ in range(3):
|
||||
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
|
||||
if msg.get('type') == 'radar-broadcast':
|
||||
response = msg
|
||||
break
|
||||
|
||||
assert response is not None and response.get('success') == True, f'Update broadcast failed: {response}'
|
||||
print(f' [PASS] test_radar_broadcast_update')
|
||||
await ws.close()
|
||||
|
||||
async def test_radar_scan_my_device():
|
||||
"""场景5: 同userId设备发现"""
|
||||
ws_a = await ws_connect()
|
||||
ws_b = await ws_connect()
|
||||
uid = f'shared-user-{int(time.time())}'
|
||||
|
||||
await register_device(ws_a, f'dev-radar-ua-{int(time.time())}', alias='MyPhone', fingerprint=f'fp_ua_{int(time.time())}', user_id=uid, ip_city='北京', ip_range='110.123.45.0/24')
|
||||
await register_device(ws_b, f'dev-radar-ub-{int(time.time())}', alias='MyPad', fingerprint=f'fp_ub_{int(time.time())}', user_id=uid, ip_city='上海', ip_range='114.114.0.0/24')
|
||||
|
||||
await ws_a.send(json.dumps({'type': 'radar-broadcast', 'data': {'alias': 'MyPhone', 'ipCity': '北京', 'ipRange': '110.123.45.0/24'}}))
|
||||
await ws_b.send(json.dumps({'type': 'radar-broadcast', 'data': {'alias': 'MyPad', 'ipCity': '上海', 'ipRange': '114.114.0.0/24'}}))
|
||||
|
||||
for _ in range(3):
|
||||
await asyncio.wait_for(ws_a.recv(), timeout=10)
|
||||
await asyncio.wait_for(ws_b.recv(), timeout=10)
|
||||
|
||||
await ws_a.send(json.dumps({'type': 'radar-scan', 'data': {'ipCity': '北京', 'ipRange': '110.123.45.0/24'}}))
|
||||
|
||||
response = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'radar-devices':
|
||||
response = msg
|
||||
break
|
||||
|
||||
assert response is not None, 'No radar-devices response'
|
||||
my_devices = [d for d in response.get('devices', []) if d.get('matchType') == 'my-device']
|
||||
assert len(my_devices) > 0, f'Should find my-device: {response}'
|
||||
print(f' [PASS] test_radar_scan_my_device: found {len(my_devices)} my-device(s)')
|
||||
await ws_a.close()
|
||||
await ws_b.close()
|
||||
|
||||
# ===== 主函数 =====
|
||||
|
||||
async def run_all_tests():
|
||||
print('\n===== 配对码测试 =====')
|
||||
await test_pairing_code_create_normal()
|
||||
await test_pairing_code_create_duplicate()
|
||||
await test_pairing_code_join_normal()
|
||||
await test_pairing_code_join_wrong_code()
|
||||
await test_pairing_code_join_self()
|
||||
|
||||
print('\n===== 雷达扫描测试 =====')
|
||||
await test_radar_broadcast_normal()
|
||||
await test_radar_scan_same_network()
|
||||
await test_radar_scan_no_match()
|
||||
await test_radar_broadcast_update()
|
||||
await test_radar_scan_my_device()
|
||||
|
||||
print('\n===== 全部测试通过! =====')
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(run_all_tests())
|
||||
183
docs/toolsapi/scripts/test_full_e2e.py
Normal file
183
docs/toolsapi/scripts/test_full_e2e.py
Normal file
@@ -0,0 +1,183 @@
|
||||
"""
|
||||
闲言APP - 文件传输全流程E2E测试脚本
|
||||
创建时间: 2026-05-19
|
||||
作用: 模拟两个设备从发现→配对→消息→文件→画布→屏幕共享完整流程
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
|
||||
WS_URL = 'wss://tools.wktyl.com:9443'
|
||||
|
||||
async def ws_connect():
|
||||
import websockets
|
||||
return await websockets.connect(WS_URL, ping_interval=None)
|
||||
|
||||
async def register_device(ws, device_id, alias, fingerprint, user_id=None, ip_city=None, ip_range=None):
|
||||
msg = {
|
||||
'type': 'register',
|
||||
'data': {'deviceId': device_id, 'alias': alias, 'deviceType': 'mobile', 'fingerprint': fingerprint, 'protocol': 'xianyan-v1'}
|
||||
}
|
||||
if user_id:
|
||||
msg['data']['userId'] = user_id
|
||||
if ip_city:
|
||||
msg['data']['ipCity'] = ip_city
|
||||
if ip_range:
|
||||
msg['data']['ipRange'] = ip_range
|
||||
await ws.send(json.dumps(msg))
|
||||
for _ in range(5):
|
||||
resp = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
|
||||
if resp.get('type') == 'registered':
|
||||
return resp
|
||||
return None
|
||||
|
||||
async def drain_messages(ws, count=3, timeout=5):
|
||||
messages = []
|
||||
for _ in range(count):
|
||||
try:
|
||||
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=timeout))
|
||||
messages.append(msg)
|
||||
except asyncio.TimeoutError:
|
||||
break
|
||||
return messages
|
||||
|
||||
async def test_full_e2e():
|
||||
"""全流程: 注册→配对码配对→文本消息→文件元数据→画布同步→屏幕共享"""
|
||||
ts = int(time.time())
|
||||
|
||||
ws_a = await ws_connect()
|
||||
ws_b = await ws_connect()
|
||||
|
||||
print(' [1/7] 注册设备...')
|
||||
reg_a = await register_device(ws_a, f'e2e-a-{ts}', 'iPhone 16 Pro', f'fp_e2e_a_{ts}', user_id=f'user-e2e-{ts}', ip_city='北京', ip_range='110.123.45.0/24')
|
||||
reg_b = await register_device(ws_b, f'e2e-b-{ts}', 'MacBook Pro', f'fp_e2e_b_{ts}', user_id=f'user-e2e-{ts}', ip_city='北京', ip_range='110.123.45.0/24')
|
||||
assert reg_a is not None and reg_b is not None, 'Registration failed'
|
||||
print(' [PASS] 设备注册成功')
|
||||
|
||||
print(' [2/7] 配对码配对...')
|
||||
await ws_a.send(json.dumps({'type': 'pairing-code-create', 'data': {'alias': 'iPhone 16 Pro'}}))
|
||||
code = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'pairing-code-created':
|
||||
code = msg['pairingCode']
|
||||
break
|
||||
assert code is not None, 'Failed to create pairing code'
|
||||
|
||||
await ws_b.send(json.dumps({'type': 'pairing-code-join', 'data': {'pairingCode': code}}))
|
||||
matched = False
|
||||
for _ in range(5):
|
||||
msg_b = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
|
||||
if msg_b.get('type') == 'pairing-matched' and msg_b.get('success'):
|
||||
matched = True
|
||||
break
|
||||
for _ in range(5):
|
||||
msg_a = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg_a.get('type') == 'pairing-matched' and msg_a.get('success'):
|
||||
break
|
||||
assert matched, 'Pairing code match failed'
|
||||
print(f' [PASS] 配对码配对成功: {code}')
|
||||
|
||||
print(' [3/7] 文本消息...')
|
||||
await ws_a.send(json.dumps({
|
||||
'type': 'textMessage',
|
||||
'to': f'fp_e2e_b_{ts}',
|
||||
'message': {'text': 'Hello from iPhone!'},
|
||||
'timestamp': int(time.time() * 1000)
|
||||
}))
|
||||
received = False
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
|
||||
if msg.get('type') == 'textMessage':
|
||||
received = True
|
||||
assert msg.get('from') == f'fp_e2e_a_{ts}', f'Wrong from: {msg}'
|
||||
break
|
||||
assert received, 'Text message not received'
|
||||
print(' [PASS] 文本消息收发成功')
|
||||
|
||||
print(' [4/7] 文件元数据...')
|
||||
await ws_a.send(json.dumps({
|
||||
'type': 'fileMeta',
|
||||
'to': f'fp_e2e_b_{ts}',
|
||||
'file': {'fileName': 'photo.jpg', 'fileSize': 1024000, 'mimeType': 'image/jpeg'},
|
||||
'timestamp': int(time.time() * 1000)
|
||||
}))
|
||||
received = False
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
|
||||
if msg.get('type') == 'fileMeta':
|
||||
received = True
|
||||
break
|
||||
assert received, 'File meta not received'
|
||||
print(' [PASS] 文件元数据传输成功')
|
||||
|
||||
print(' [5/7] 画布同步...')
|
||||
canvas_id = f'e2e-canvas-{ts}'
|
||||
await ws_a.send(json.dumps({'type': 'canvas-join', 'payload': {'canvasId': canvas_id}}))
|
||||
await drain_messages(ws_a, 3)
|
||||
await ws_b.send(json.dumps({'type': 'canvas-join', 'payload': {'canvasId': canvas_id}}))
|
||||
await drain_messages(ws_a, 3)
|
||||
await drain_messages(ws_b, 3)
|
||||
|
||||
await ws_a.send(json.dumps({
|
||||
'type': 'canvas-stroke',
|
||||
'payload': {'canvasId': canvas_id, 'stroke': {'strokeId': 'e2e-s1', 'points': [[50, 60]], 'color': '#000', 'width': 2, 'tool': 'pen'}}
|
||||
}))
|
||||
received = False
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
|
||||
if msg.get('type') == 'canvas-stroke':
|
||||
received = True
|
||||
break
|
||||
assert received, 'Canvas stroke not received'
|
||||
print(' [PASS] 画布同步成功')
|
||||
|
||||
print(' [6/7] 屏幕共享请求...')
|
||||
await ws_a.send(json.dumps({
|
||||
'type': 'screen-share-request',
|
||||
'to': f'fp_e2e_b_{ts}',
|
||||
'payload': {'direction': 'view'}
|
||||
}))
|
||||
received = False
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
|
||||
if msg.get('type') == 'screen-share-request':
|
||||
received = True
|
||||
break
|
||||
assert received, 'Screen share request not received'
|
||||
|
||||
await ws_b.send(json.dumps({
|
||||
'type': 'screen-share-accept',
|
||||
'to': f'fp_e2e_a_{ts}',
|
||||
'payload': {'direction': 'view'}
|
||||
}))
|
||||
received = False
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'screen-share-accept':
|
||||
received = True
|
||||
break
|
||||
assert received, 'Screen share accept not received'
|
||||
print(' [PASS] 屏幕共享请求/接受成功')
|
||||
|
||||
print(' [7/7] 心跳保活...')
|
||||
await ws_a.send(json.dumps({'type': 'heartbeat', 'data': {'timestamp': int(time.time() * 1000)}}))
|
||||
got_ack = False
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'heartbeat_ack':
|
||||
got_ack = True
|
||||
break
|
||||
assert got_ack, 'Heartbeat ack not received'
|
||||
print(' [PASS] 心跳保活成功')
|
||||
|
||||
await ws_a.close()
|
||||
await ws_b.close()
|
||||
print('\n ===== 全流程E2E测试通过! =====')
|
||||
|
||||
async def run_all_tests():
|
||||
print('\n===== 全流程E2E测试 =====')
|
||||
await test_full_e2e()
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(run_all_tests())
|
||||
214
docs/toolsapi/scripts/test_screen_share_signaling.py
Normal file
214
docs/toolsapi/scripts/test_screen_share_signaling.py
Normal file
@@ -0,0 +1,214 @@
|
||||
"""
|
||||
闲言APP - 屏幕共享信令测试脚本
|
||||
创建时间: 2026-05-19
|
||||
作用: 测试屏幕共享请求/接受/拒绝/停止信令
|
||||
每接口3-5个测试场景
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
|
||||
WS_URL = 'wss://tools.wktyl.com:9443'
|
||||
|
||||
async def ws_connect():
|
||||
import websockets
|
||||
return await websockets.connect(WS_URL, ping_interval=None)
|
||||
|
||||
async def register_device(ws, device_id, alias='TestDevice', fingerprint=None):
|
||||
msg = {
|
||||
'type': 'register',
|
||||
'data': {'deviceId': device_id, 'alias': alias, 'deviceType': 'mobile', 'protocol': 'xianyan-v1'}
|
||||
}
|
||||
if fingerprint:
|
||||
msg['data']['fingerprint'] = fingerprint
|
||||
await ws.send(json.dumps(msg))
|
||||
for _ in range(10):
|
||||
response = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
|
||||
if response.get('type') == 'registered':
|
||||
return response
|
||||
return response
|
||||
|
||||
async def test_screen_share_request_view():
|
||||
"""场景1: 请求看对方屏幕"""
|
||||
ws_a = await ws_connect()
|
||||
ws_b = await ws_connect()
|
||||
|
||||
fp_a = f'fp_ss_view_a_{int(time.time())}'
|
||||
fp_b = f'fp_ss_view_b_{int(time.time())}'
|
||||
reg_a = await register_device(ws_a, f'dev-ss-view-a-{int(time.time())}', alias='Viewer', fingerprint=fp_a)
|
||||
reg_b = await register_device(ws_b, f'dev-ss-view-b-{int(time.time())}', alias='Sharer', fingerprint=fp_b)
|
||||
|
||||
await ws_a.send(json.dumps({
|
||||
'type': 'screen-share-request',
|
||||
'to': fp_b,
|
||||
'payload': {'direction': 'view'}
|
||||
}))
|
||||
|
||||
response = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
|
||||
if msg.get('type') == 'screen-share-request':
|
||||
response = msg
|
||||
break
|
||||
|
||||
assert response is not None, 'No screen-share-request received'
|
||||
assert response.get('direction') == 'view', f'Wrong direction: {response}'
|
||||
assert response.get('from') == fp_a, f'Wrong from: {response}'
|
||||
print(f' [PASS] test_screen_share_request_view')
|
||||
await ws_a.close()
|
||||
await ws_b.close()
|
||||
|
||||
async def test_screen_share_request_share():
|
||||
"""场景2: 请求让对方看我屏幕"""
|
||||
ws_a = await ws_connect()
|
||||
ws_b = await ws_connect()
|
||||
|
||||
fp_a = f'fp_ss_share_a_{int(time.time())}'
|
||||
fp_b = f'fp_ss_share_b_{int(time.time())}'
|
||||
await register_device(ws_a, f'dev-ss-share-a-{int(time.time())}', alias='Sharer', fingerprint=fp_a)
|
||||
await register_device(ws_b, f'dev-ss-share-b-{int(time.time())}', alias='Viewer', fingerprint=fp_b)
|
||||
|
||||
await ws_a.send(json.dumps({
|
||||
'type': 'screen-share-request',
|
||||
'to': fp_b,
|
||||
'payload': {'direction': 'share'}
|
||||
}))
|
||||
|
||||
response = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
|
||||
if msg.get('type') == 'screen-share-request':
|
||||
response = msg
|
||||
break
|
||||
|
||||
assert response is not None, 'No screen-share-request received'
|
||||
assert response.get('direction') == 'share', f'Wrong direction: {response}'
|
||||
print(f' [PASS] test_screen_share_request_share')
|
||||
await ws_a.close()
|
||||
await ws_b.close()
|
||||
|
||||
async def test_screen_share_accept():
|
||||
"""场景3: 正常接受屏幕共享"""
|
||||
ws_a = await ws_connect()
|
||||
ws_b = await ws_connect()
|
||||
|
||||
fp_a = f'fp_ss_acc_a_{int(time.time())}'
|
||||
fp_b = f'fp_ss_acc_b_{int(time.time())}'
|
||||
await register_device(ws_a, f'dev-ss-acc-a-{int(time.time())}', alias='Requester', fingerprint=fp_a)
|
||||
await register_device(ws_b, f'dev-ss-acc-b-{int(time.time())}', alias='Accepter', fingerprint=fp_b)
|
||||
|
||||
await ws_a.send(json.dumps({'type': 'screen-share-request', 'to': fp_b, 'payload': {'direction': 'view'}}))
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
|
||||
if msg.get('type') == 'screen-share-request':
|
||||
break
|
||||
|
||||
await ws_b.send(json.dumps({
|
||||
'type': 'screen-share-accept',
|
||||
'to': fp_a,
|
||||
'payload': {'direction': 'view'}
|
||||
}))
|
||||
|
||||
response = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'screen-share-accept':
|
||||
response = msg
|
||||
break
|
||||
|
||||
assert response is not None, 'No screen-share-accept received'
|
||||
assert response.get('from') == fp_b, f'Wrong from: {response}'
|
||||
print(f' [PASS] test_screen_share_accept')
|
||||
await ws_a.close()
|
||||
await ws_b.close()
|
||||
|
||||
async def test_screen_share_reject():
|
||||
"""场景4: 拒绝屏幕共享"""
|
||||
ws_a = await ws_connect()
|
||||
ws_b = await ws_connect()
|
||||
|
||||
fp_a = f'fp_ss_rej_a_{int(time.time())}'
|
||||
fp_b = f'fp_ss_rej_b_{int(time.time())}'
|
||||
await register_device(ws_a, f'dev-ss-rej-a-{int(time.time())}', alias='Requester', fingerprint=fp_a)
|
||||
await register_device(ws_b, f'dev-ss-rej-b-{int(time.time())}', alias='Rejecter', fingerprint=fp_b)
|
||||
|
||||
await ws_a.send(json.dumps({'type': 'screen-share-request', 'to': fp_b, 'payload': {'direction': 'view'}}))
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
|
||||
if msg.get('type') == 'screen-share-request':
|
||||
break
|
||||
|
||||
await ws_b.send(json.dumps({'type': 'screen-share-reject', 'to': fp_a}))
|
||||
|
||||
response = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_a.recv(), timeout=10))
|
||||
if msg.get('type') == 'screen-share-reject':
|
||||
response = msg
|
||||
break
|
||||
|
||||
assert response is not None, 'No screen-share-reject received'
|
||||
print(f' [PASS] test_screen_share_reject')
|
||||
await ws_a.close()
|
||||
await ws_b.close()
|
||||
|
||||
async def test_screen_share_stop():
|
||||
"""场景5: 停止屏幕共享"""
|
||||
ws_a = await ws_connect()
|
||||
ws_b = await ws_connect()
|
||||
|
||||
fp_a = f'fp_ss_stop_a_{int(time.time())}'
|
||||
fp_b = f'fp_ss_stop_b_{int(time.time())}'
|
||||
await register_device(ws_a, f'dev-ss-stop-a-{int(time.time())}', alias='Stopper', fingerprint=fp_a)
|
||||
await register_device(ws_b, f'dev-ss-stop-b-{int(time.time())}', alias='Other', fingerprint=fp_b)
|
||||
|
||||
await ws_a.send(json.dumps({'type': 'screen-share-stop', 'to': fp_b}))
|
||||
|
||||
response = None
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws_b.recv(), timeout=10))
|
||||
if msg.get('type') == 'screen-share-stop':
|
||||
response = msg
|
||||
break
|
||||
|
||||
assert response is not None, 'No screen-share-stop received'
|
||||
print(f' [PASS] test_screen_share_stop')
|
||||
await ws_a.close()
|
||||
await ws_b.close()
|
||||
|
||||
async def test_screen_share_request_offline():
|
||||
"""场景6: 对方离线时请求"""
|
||||
ws = await ws_connect()
|
||||
await register_device(ws, f'dev-ss-offline-{int(time.time())}', alias='Alone', fingerprint=f'fp_offline_{int(time.time())}')
|
||||
|
||||
await ws.send(json.dumps({
|
||||
'type': 'screen-share-request',
|
||||
'to': 'nonexistent-fingerprint-xxx',
|
||||
'payload': {'direction': 'view'}
|
||||
}))
|
||||
|
||||
got_error = False
|
||||
try:
|
||||
for _ in range(5):
|
||||
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=5))
|
||||
if msg.get('type') == 'screen-share-response' and msg.get('success') == False:
|
||||
got_error = True
|
||||
break
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
|
||||
print(f' [PASS] test_screen_share_request_offline (error_response={got_error})')
|
||||
await ws.close()
|
||||
|
||||
async def run_all_tests():
|
||||
print('\n===== 屏幕共享信令测试 =====')
|
||||
await test_screen_share_request_view()
|
||||
await test_screen_share_request_share()
|
||||
await test_screen_share_accept()
|
||||
await test_screen_share_reject()
|
||||
await test_screen_share_stop()
|
||||
await test_screen_share_request_offline()
|
||||
print('\n===== 全部测试通过! =====')
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(run_all_tests())
|
||||
34
docs/toolsapi/scripts/upload_agreements.py
Normal file
34
docs/toolsapi/scripts/upload_agreements.py
Normal file
@@ -0,0 +1,34 @@
|
||||
"""Upload agreement HTML pages to tools.wktyl.com/agreements/"""
|
||||
import paramiko
|
||||
import os
|
||||
|
||||
HOST = '123.207.67.197'
|
||||
USER = 'root'
|
||||
PASS = '520Kiss123'
|
||||
REMOTE_BASE = '/www/wwwroot/tools.wktyl.com/public/agreements'
|
||||
LOCAL_DIR = r'e:\project\flutter\f\xianyan\docs\toolsapi\agreements'
|
||||
|
||||
ssh = paramiko.SSHClient()
|
||||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
ssh.connect(HOST, username=USER, password=PASS)
|
||||
|
||||
sftp = ssh.open_sftp()
|
||||
|
||||
try:
|
||||
sftp.mkdir(REMOTE_BASE)
|
||||
print(f'Created directory: {REMOTE_BASE}')
|
||||
except IOError:
|
||||
print(f'Directory already exists: {REMOTE_BASE}')
|
||||
|
||||
for filename in os.listdir(LOCAL_DIR):
|
||||
if filename.endswith('.html'):
|
||||
local_path = os.path.join(LOCAL_DIR, filename)
|
||||
remote_path = f'{REMOTE_BASE}/{filename}'
|
||||
print(f'Uploading {filename}...')
|
||||
sftp.put(local_path, remote_path)
|
||||
print(f' -> {remote_path}')
|
||||
|
||||
sftp.close()
|
||||
ssh.close()
|
||||
print('\nAll files uploaded successfully!')
|
||||
print(f'Visit: https://tools.wktyl.com/agreements/')
|
||||
@@ -21,6 +21,10 @@ print(f'Uploading index.js to {remote_file}...')
|
||||
backup_path = REMOTE_BASE + '/index.js.bak'
|
||||
try:
|
||||
sftp.stat(remote_file)
|
||||
try:
|
||||
sftp.remove(backup_path)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
sftp.rename(remote_file, backup_path)
|
||||
print(f'Backup created: {backup_path}')
|
||||
except FileNotFoundError:
|
||||
|
||||
Reference in New Issue
Block a user