This commit is contained in:
Developer
2026-05-12 06:28:35 +08:00
parent 283950ea07
commit 2c1a87e7c5
10 changed files with 87 additions and 1881 deletions

View File

@@ -1,35 +0,0 @@
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect('123.207.67.197', 22, 'root', '520Kiss123')
print('=== Create test file ===')
stdin, stdout, stderr = ssh.exec_command('echo "test content 12345" > /tmp/test_upload.txt && cat /tmp/test_upload.txt')
print(stdout.read().decode())
print('=== Test upload with verbose ===')
stdin, stdout, stderr = ssh.exec_command(
'curl -sv -X POST "https://tools.wktyl.com/api/cloud_cache/upload" '
'-F "file=@/tmp/test_upload.txt" '
'-F "fromId=device_test" '
'-F "toId=device_test2" '
'-F "fileName=test.txt" '
'-F "fileSize=18" '
'-F "mimeType=text/plain" 2>&1'
)
print(stdout.read().decode())
print('=== Check PHP error logs ===')
stdin, stdout, stderr = ssh.exec_command(
'find /www/wwwroot/tools.wktyl.com/runtime -name "*.log" -mmin -5 -exec tail -20 {} \\; 2>/dev/null'
)
print(stdout.read().decode())
print('=== Check nginx error log ===')
stdin, stdout, stderr = ssh.exec_command(
'tail -10 /www/server/nginx/logs/tools.wktyl.com.log 2>/dev/null || tail -10 /var/log/nginx/error.log 2>/dev/null || echo "no nginx log"'
)
print(stdout.read().decode())
ssh.close()

View File

@@ -1,97 +0,0 @@
#!/usr/bin/env python3
"""
============================================================
闲言工具箱 — IP归属地修复部署脚本
创建时间: 2026-05-12
作用: 上传修复ip_city为空的UserSecurity.php到服务器
============================================================
"""
import paramiko
import time
HOST = '123.207.67.197'
PORT = 22
USER = 'root'
PASS = '520Kiss123'
REMOTE_APP_PATH = '/www/wwwroot/tools.wktyl.com/application'
LOCAL_FILE = r'e:\project\flutter\f\xianyan\docs\toolsapi\application\api\controller\UserSecurity.php'
REMOTE_FILE = f'{REMOTE_APP_PATH}/api/controller/UserSecurity.php'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(HOST, PORT, USER, PASS)
sftp = ssh.open_sftp()
# 备份
timestamp = time.strftime('%Y%m%d_%H%M%S')
backup = f"{REMOTE_FILE}.bak.{timestamp}"
try:
sftp.rename(REMOTE_FILE, backup)
print(f"✅ 备份: {backup}")
except:
print("⚠️ 无需备份")
# 上传
sftp.put(LOCAL_FILE, REMOTE_FILE)
sftp.chmod(REMOTE_FILE, 0o644)
print("✅ 上传: UserSecurity.php (含queryIpLocation修复)")
# 清理缓存
ssh.exec_command(f'rm -rf {REMOTE_APP_PATH}/../runtime/cache/* {REMOTE_APP_PATH}/../runtime/temp/*')
print("✅ 缓存已清理")
# 验证文件
stat = sftp.stat(REMOTE_FILE)
print(f"✅ 文件大小: {stat.st_size} bytes")
sftp.close()
# 测试: 登录后检查ip_city
print("\n--- 测试: 登录后ip_city是否自动填充 ---")
import requests as req
session = req.Session()
session.headers.update({
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
})
resp = session.post('https://tools.wktyl.com/api/user_security/login', data={
'account': 'apitest_user',
'password': '123456',
}, timeout=15)
login_data = resp.json()
if login_data.get('code') == 1:
token = login_data['data']['userinfo']['token']
user_id = login_data['data']['userinfo']['id']
print(f"✅ 登录成功 (user_id={user_id})")
# 查询设备列表
resp2 = session.get(
'https://tools.wktyl.com/api/user_center/myDevices',
headers={'token': token},
timeout=15,
)
devices_data = resp2.json()
if devices_data.get('code') == 1:
devices = devices_data.get('data', {}).get('devices', [])
for d in devices:
ip_city = d.get('ip_city', '')
ip_range = d.get('ip_range', '')
device_name = d.get('device_name', '')
print(f" 设备: {device_name} | ip_city={ip_city} | ip_range={ip_range}")
if ip_city:
print(" ✅ ip_city已自动填充!")
else:
print(" ⚠️ ip_city仍为空")
else:
print(f" ❌ 设备列表查询失败: {devices_data.get('msg')}")
else:
print(f"❌ 登录失败: {login_data.get('msg')}")
ssh.close()
print("\n部署完成!")

View File

@@ -1,161 +0,0 @@
#!/usr/bin/env python3
"""
============================================================
闲言工具箱 — 注销功能部署脚本
创建时间: 2026-05-12
作用: 上传注销接口相关PHP文件到服务器并执行SQL迁移
上次更新: v9.2.0 新建
============================================================
"""
import paramiko
import os
import sys
import time
HOST = '123.207.67.197'
PORT = 22
USER = 'root'
PASS = '520Kiss123'
REMOTE_APP_PATH = '/www/wwwroot/tools.wktyl.com/application'
FILES_TO_UPLOAD = [
{
'local': r'e:\project\flutter\f\xianyan\docs\toolsapi\application\api\controller\UserSecurity.php',
'remote': f'{REMOTE_APP_PATH}/api/controller/UserSecurity.php',
'desc': 'UserSecurity.php (新增requestDeletion/deletionStatus/cancelDeletion)',
},
{
'local': r'e:\project\flutter\f\xianyan\docs\toolsapi\application\api\controller\User.php',
'remote': f'{REMOTE_APP_PATH}/api/controller/User.php',
'desc': 'User.php (新增注销方法转发)',
},
{
'local': r'e:\project\flutter\f\xianyan\docs\toolsapi\application\route.php',
'remote': f'{REMOTE_APP_PATH}/route.php',
'desc': 'route.php (新增注销路由)',
},
]
SQL_MIGRATION = r'e:\project\flutter\f\xianyan\docs\toolsapi\application\admin\command\Install\migrate_v9_2.sql'
def main():
print("=" * 60)
print("闲言工具箱 — 注销功能部署")
print("=" * 60)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(HOST, PORT, USER, PASS)
sftp = ssh.open_sftp()
# 1. 上传PHP文件
print("\n--- 上传PHP文件 ---")
for item in FILES_TO_UPLOAD:
local = item['local']
remote = item['remote']
desc = item['desc']
if not os.path.exists(local):
print(f"❌ 本地文件不存在: {local}")
continue
timestamp = time.strftime('%Y%m%d_%H%M%S')
backup = f"{remote}.bak.{timestamp}"
try:
sftp.rename(remote, backup)
print(f" 备份: {remote} -> {backup}")
except:
print(f" 无需备份(文件不存在): {remote}")
sftp.put(local, remote)
sftp.chmod(remote, 0o644)
print(f" ✅ 上传: {desc}")
# 2. 上传SQL迁移文件
print("\n--- 上传SQL迁移文件 ---")
if os.path.exists(SQL_MIGRATION):
remote_sql = f'{REMOTE_APP_PATH}/admin/command/Install/migrate_v9_2.sql'
sftp.put(SQL_MIGRATION, remote_sql)
sftp.chmod(remote_sql, 0o644)
print(f" ✅ 上传: migrate_v9_2.sql")
else:
print(f" ❌ SQL文件不存在: {SQL_MIGRATION}")
# 3. 执行SQL迁移
print("\n--- 执行SQL迁移(创建user_deletion表) ---")
sql_content = open(SQL_MIGRATION, 'r', encoding='utf-8').read()
sql_statements = [s.strip() for s in sql_content.split(';') if s.strip() and not s.strip().startswith('--')]
for sql in sql_statements:
if 'CREATE TABLE' in sql:
check_cmd = "mysql -u tools -ptools tools -e \"SHOW TABLES LIKE 'fa_user_deletion'\" 2>/dev/null"
stdin, stdout, stderr = ssh.exec_command(check_cmd)
result = stdout.read().decode().strip()
if result:
print(" ⚠️ fa_user_deletion表已存在跳过创建")
else:
full_sql = sql + ';'
mysql_cmd = f"mysql -u tools -ptools tools -e \"{full_sql.replace('\"', '\\\"')}\" 2>/dev/null"
stdin, stdout, stderr = ssh.exec_command(mysql_cmd)
err = stderr.read().decode().strip()
if err:
print(f" ❌ SQL执行错误: {err}")
else:
print(" ✅ fa_user_deletion表创建成功")
# 4. 清理PHP缓存
print("\n--- 清理PHP缓存 ---")
stdin, stdout, stderr = ssh.exec_command(f'rm -rf {REMOTE_APP_PATH}/../runtime/cache/* {REMOTE_APP_PATH}/../runtime/temp/*')
print(" ✅ 缓存已清理")
# 5. 验证文件
print("\n--- 验证上传文件 ---")
for item in FILES_TO_UPLOAD:
remote = item['remote']
try:
stat = sftp.stat(remote)
print(f"{item['desc']} ({stat.st_size} bytes)")
except:
print(f" ❌ 文件不存在: {remote}")
sftp.close()
# 6. 快速接口检测
print("\n--- 接口可用性检测 ---")
import requests as req
test_urls = [
('POST', '/api/user_security/requestDeletion', '注销申请接口'),
('GET', '/api/user_security/deletionStatus', '注销状态接口'),
('POST', '/api/user_security/cancelDeletion', '取消注销接口'),
]
for method, path, desc in test_urls:
url = f'https://tools.wktyl.com{path}'
try:
if method == 'GET':
r = req.get(url, timeout=10)
else:
r = req.post(url, timeout=10)
if r.status_code == 404:
print(f"{desc}: 404 未找到")
elif r.status_code == 200:
data = r.json()
if data.get('code') == -1:
print(f"{desc}: 接口存在(需登录)")
else:
print(f"{desc}: 接口正常(code={data.get('code')})")
else:
print(f" ⚠️ {desc}: HTTP {r.status_code}")
except Exception as e:
print(f"{desc}: 请求失败 - {e}")
ssh.close()
print("\n" + "=" * 60)
print("部署完成!")
print("=" * 60)
if __name__ == "__main__":
main()

View File

@@ -1,34 +0,0 @@
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect('123.207.67.197', 22, 'root', '520Kiss123')
print('=== Fix directory permissions ===')
stdin, stdout, stderr = ssh.exec_command(
'chown -R www:www /www/wwwroot/tools.wktyl.com/runtime/cloud_cache && '
'chmod -R 755 /www/wwwroot/tools.wktyl.com/runtime/cloud_cache && '
'ls -la /www/wwwroot/tools.wktyl.com/runtime/ | grep cloud'
)
print(stdout.read().decode())
print(stderr.read().decode())
print('=== Check web server user ===')
stdin, stdout, stderr = ssh.exec_command(
'ps aux | grep -E "nginx|php-fpm" | head -5'
)
print(stdout.read().decode())
print('=== Test upload again ===')
stdin, stdout, stderr = ssh.exec_command(
'curl -s -X POST "https://tools.wktyl.com/api/cloud_cache/upload" '
'-F "file=@/tmp/test_upload.txt" '
'-F "fromId=device_test" '
'-F "toId=device_test2" '
'-F "fileName=test.txt" '
'-F "fileSize=18" '
'-F "mimeType=text/plain"'
)
print(stdout.read().decode())
ssh.close()

View File

@@ -1,48 +0,0 @@
#!/usr/bin/env python3
import paramiko
import time
HOST = '123.207.67.197'
PORT = 22
USER = 'root'
PASS = '520Kiss123'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(HOST, PORT, USER, PASS)
print("--- 上传SQL文件 ---")
sftp = ssh.open_sftp()
local_sql = r'e:\project\flutter\f\xianyan\docs\toolsapi\application\admin\command\Install\migrate_v9_2.sql'
remote_sql = '/tmp/migrate_v9_2.sql'
sftp.put(local_sql, remote_sql)
sftp.chmod(remote_sql, 0o644)
sftp.close()
print("✅ SQL文件已上传到 /tmp/migrate_v9_2.sql")
print("\n--- 执行SQL迁移 ---")
stdin, stdout, stderr = ssh.exec_command('mysql -u tools -ptools tools < /tmp/migrate_v9_2.sql 2>&1')
out = stdout.read().decode().strip()
err = stderr.read().decode().strip()
if out:
print(f"输出: {out}")
if err:
print(f"错误: {err}")
if not out and not err:
print("✅ SQL执行成功(无输出=成功)")
print("\n--- 验证表结构 ---")
stdin, stdout, stderr = ssh.exec_command("mysql -u tools -ptools tools -e 'DESC fa_user_deletion' 2>/dev/null")
result = stdout.read().decode().strip()
if result:
print(result)
print("✅ fa_user_deletion表创建成功")
else:
print("❌ 表不存在")
print("\n--- 清理临时文件 ---")
ssh.exec_command('rm -f /tmp/migrate_v9_2.sql')
print("✅ 临时文件已清理")
ssh.close()
print("\n完成!")

View File

@@ -1,218 +0,0 @@
import paramiko
import json
import time
import os
HOST = '123.207.67.197'
PORT = 22
USER = 'root'
PASS = '520Kiss123'
BASE_URL = 'https://tools.wktyl.com/api/cloud_cache'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(HOST, PORT, USER, PASS)
def run_curl(cmd, label=''):
print(f'\n{"="*60}')
print(f'TEST: {label}')
print(f'CMD: {cmd[:120]}...')
stdin, stdout, stderr = ssh.exec_command(cmd)
out = stdout.read().decode()
err = stderr.read().decode()
print(f'RESPONSE: {out[:500]}')
if err:
print(f'ERROR: {err[:200]}')
return out
passed = 0
failed = 0
def check(label, condition):
global passed, failed
if condition:
passed += 1
print(f' ✅ PASS: {label}')
else:
failed += 1
print(f' ❌ FAIL: {label}')
# ─── Test 1: Install table ───────────────────────────────
result = run_curl(f'curl -s -X POST "{BASE_URL}/install"', 'Install database table')
try:
data = json.loads(result)
check('Install returns success', data.get('code') == 1)
except:
check('Install returns valid JSON', False)
# ─── Test 2: Upload file (anonymous) ─────────────────────
test_file = '/tmp/test_cloud_cache_upload.txt'
ssh.exec_command(f'echo "This is a test file for cloud cache - {int(time.time())}" > {test_file}')
time.sleep(0.5)
result = run_curl(
f'curl -s -X POST "{BASE_URL}/upload" '
f'-F "file=@{test_file}" '
f'-F "fromId=device_test_sender" '
f'-F "toId=device_test_receiver" '
f'-F "fileName=test_upload.txt" '
f'-F "fileSize=100" '
f'-F "mimeType=text/plain" '
f'-F "expireHours=24"',
'Upload file (anonymous)'
)
cache_id = None
try:
data = json.loads(result)
check('Upload returns success', data.get('code') == 1)
cache_id = data.get('data', {}).get('cacheId', '')
check('Upload returns cacheId', len(cache_id) > 0)
check('Upload returns expiresAt', data.get('data', {}).get('expiresAt') is not None)
except Exception as e:
check('Upload returns valid JSON', False)
print(f' Exception: {e}')
# ─── Test 3: Upload dangerous file type ──────────────────
danger_file = '/tmp/test_malicious.php'
ssh.exec_command(f'echo "<?php echo \\"hack\\"; ?>" > {danger_file}')
time.sleep(0.5)
result = run_curl(
f'curl -s -X POST "{BASE_URL}/upload" '
f'-F "file=@{danger_file}" '
f'-F "fromId=device_test" '
f'-F "toId=device_test2" '
f'-F "fileName=malicious.php" '
f'-F "mimeType=application/x-php"',
'Upload dangerous file type (should be rejected)'
)
try:
data = json.loads(result)
check('Dangerous file rejected', data.get('code') != 1)
check('Error mentions unsupported type', '不支持' in data.get('msg', '') or '类型' in data.get('msg', ''))
except:
check('Dangerous file rejection returns valid JSON', False)
# ─── Test 4: Upload oversized file (anonymous > 10MB) ────
big_file = '/tmp/test_big_file.bin'
ssh.exec_command(f'dd if=/dev/zero of={big_file} bs=1M count=11 2>/dev/null')
time.sleep(1)
result = run_curl(
f'curl -s -X POST "{BASE_URL}/upload" '
f'-F "file=@{big_file}" '
f'-F "fromId=device_test" '
f'-F "toId=device_test2" '
f'-F "fileName=big_file.bin"',
'Upload oversized file >10MB anonymous (should be rejected)'
)
try:
data = json.loads(result)
check('Oversized file rejected', data.get('code') != 1)
check('Error mentions size limit', '大小限制' in data.get('msg', '') or '10' in data.get('msg', ''))
except:
check('Oversized rejection returns valid JSON', False)
# ─── Test 5: List files ─────────────────────────────────
result = run_curl(
f'curl -s -X GET "{BASE_URL}/list?userId=device_test_receiver"',
'List files for receiver'
)
try:
data = json.loads(result)
check('List returns success', data.get('code') == 1)
items = data.get('data', [])
check('List returns array', isinstance(items, list))
if cache_id and isinstance(items, list):
found = any(item.get('cacheId') == cache_id for item in items)
check('List contains uploaded file', found)
except:
check('List returns valid JSON', False)
# ─── Test 6: Get file info ──────────────────────────────
if cache_id:
result = run_curl(
f'curl -s -X GET "{BASE_URL}/info?cacheId={cache_id}"',
'Get file info'
)
try:
data = json.loads(result)
check('Info returns success', data.get('code') == 1)
check('Info returns correct cacheId', data.get('data', {}).get('cacheId') == cache_id)
check('Info returns fileName', data.get('data', {}).get('fileName') is not None)
except:
check('Info returns valid JSON', False)
# ─── Test 7: Download file ──────────────────────────────
if cache_id:
result = run_curl(
f'curl -s -o /tmp/test_download.txt -w "%{{http_code}}" '
f'"{BASE_URL}/download?cacheId={cache_id}&deviceId=device_test_receiver"',
'Download file'
)
check('Download returns HTTP 200', '200' in result)
stdin, stdout, stderr = ssh.exec_command('cat /tmp/test_download.txt')
dl_content = stdout.read().decode()
check('Downloaded file has content', len(dl_content) > 0)
# ─── Test 8: Notify ─────────────────────────────────────
result = run_curl(
f'curl -s -X POST "{BASE_URL}/notify" '
f'-H "Content-Type: application/json" '
f'-d \'{{"cacheId":"test_notify_id","toId":"device_test_receiver","fromId":"device_test_sender","fileName":"test.txt"}}\'',
'Send notification'
)
try:
data = json.loads(result)
check('Notify returns success', data.get('code') == 1)
except:
check('Notify returns valid JSON', False)
# ─── Test 9: Delete file ────────────────────────────────
if cache_id:
result = run_curl(
f'curl -s -X DELETE "{BASE_URL}/delete" '
f'-H "Content-Type: application/json" '
f'-d \'{{"cacheId":"{cache_id}","deviceId":"device_test_sender"}}\'',
'Delete file'
)
try:
data = json.loads(result)
check('Delete returns success', data.get('code') == 1)
except:
check('Delete returns valid JSON', False)
result = run_curl(
f'curl -s -X GET "{BASE_URL}/info?cacheId={cache_id}"',
'Verify file deleted'
)
try:
data = json.loads(result)
check('Deleted file returns error', data.get('code') != 1)
except:
check('Deleted file check returns valid JSON', False)
# ─── Test 10: Clean expired ─────────────────────────────
result = run_curl(
f'curl -s -X POST "{BASE_URL}/clean"',
'Clean expired files'
)
try:
data = json.loads(result)
check('Clean returns success', data.get('code') == 1)
except:
check('Clean returns valid JSON', False)
# ─── Cleanup ────────────────────────────────────────────
ssh.exec_command(f'rm -f {test_file} {danger_file} {big_file} /tmp/test_download.txt')
ssh.close()
print(f'\n{"="*60}')
print(f'TEST RESULTS: {passed} passed, {failed} failed, {passed + failed} total')
if failed == 0:
print('🎉 ALL TESTS PASSED!')
else:
print(f'⚠️ {failed} test(s) failed')

View File

@@ -1,222 +0,0 @@
import paramiko
import json
import time
import hashlib
import sys
HOST = '123.207.67.197'
PORT = 22
USER = 'root'
PASS = '520Kiss123'
WS_PORT = 3001
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(HOST, PORT, USER, PASS)
print('=== 接口测试: 信令服务器 v11.0.0 ===\n')
test_results = []
def run_test(name, command):
print(f'\n--- 测试: {name} ---')
stdin, stdout, stderr = ssh.exec_command(command)
out = stdout.read().decode().strip()
err = stderr.read().decode().strip()
print(f'输出: {out[:500]}')
if err:
print(f'错误: {err[:300]}')
success = 'error' not in out.lower() and 'fail' not in out.lower() and not err
test_results.append((name, success))
return out, err
print('1. 检查服务运行状态')
out, err = run_test('PM2 Status', 'pm2 describe signaling | grep status')
if 'online' in out:
print(' ✅ 信令服务在线')
else:
print(' ❌ 信令服务离线')
print('\n2. 检查WebSocket端口监听')
out, err = run_test('Port Check', f'ss -tlnp | grep {WS_PORT}')
if str(WS_PORT) in out:
print(f' ✅ 端口 {WS_PORT} 正在监听')
else:
print(f' ❌ 端口 {WS_PORT} 未监听')
print('\n3. 测试WebSocket连接 + register')
node_test = f'''
const WebSocket = require("ws");
const ws = new WebSocket("ws://127.0.0.1:{WS_PORT}");
let results = [];
ws.on("open", () => {{
ws.send(JSON.stringify({{
type: "register",
payload: {{ userId: "test_user_001", alias: "TestDevice", platform: "test" }}
}}));
}});
ws.on("message", (data) => {{
const msg = JSON.parse(data);
results.push(msg);
if (msg.type === "registered") {{
console.log("REGISTER OK: " + JSON.stringify(msg));
ws.send(JSON.stringify({{
type: "discoverMyDevices",
payload: {{ userId: "test_user_001" }}
}}));
}}
if (msg.type === "myDevicesResponse") {{
console.log("DISCOVER OK: " + JSON.stringify(msg));
ws.send(JSON.stringify({{
type: "pair-request",
to: "nonexistent_peer",
fingerprint: "test_fp_123"
}}));
}}
if (msg.type === "pair-response") {{
console.log("PAIR-REQUEST ROUTED: " + JSON.stringify(msg));
ws.send(JSON.stringify({{
type: "heartbeat"
}}));
console.log("HEARTBEAT OK");
ws.close();
}}
if (msg.type === "pong") {{
console.log("PONG OK");
}}
}});
ws.on("error", (err) => {{
console.log("WS ERROR: " + err.message);
}});
setTimeout(() => {{
console.log("TIMEOUT - closing");
ws.close();
process.exit(0);
}}, 5000);
'''
out, err = run_test('WebSocket + Register + Discover + Pair', f'cd /www/wwwroot/tools.wktyl.com/signaling && node -e \'{node_test.replace(chr(10), " ").replace(chr(13), "")}\'')
print('\n4. 测试通用消息转发 (delivery-ack)')
node_test_relay = f'''
const WebSocket = require("ws");
const ws1 = new WebSocket("ws://127.0.0.1:{WS_PORT}");
const ws2 = new WebSocket("ws://127.0.0.1:{WS_PORT}");
let peer1Id, peer2Id;
ws1.on("open", () => {{
ws1.send(JSON.stringify({{ type: "register", payload: {{ userId: "test_relay_user" }} }}));
}});
ws1.on("message", (data) => {{
const msg = JSON.parse(data);
if (msg.type === "registered") peer1Id = msg.id;
}});
ws2.on("open", () => {{
ws2.send(JSON.stringify({{ type: "register", payload: {{ userId: "test_relay_user" }} }}));
}});
ws2.on("message", (data) => {{
const msg = JSON.parse(data);
if (msg.type === "registered") {{
peer2Id = msg.id;
setTimeout(() => {{
ws1.send(JSON.stringify({{
type: "delivery-ack",
to: peer2Id,
messageId: "test_msg_001",
status: "delivered"
}}));
}}, 500);
}}
if (msg.type === "delivery-ack") {{
console.log("DELIVERY-ACK FORWARDED: " + JSON.stringify(msg));
ws1.close();
ws2.close();
process.exit(0);
}}
}});
setTimeout(() => {{
console.log("TIMEOUT");
ws1.close();
ws2.close();
process.exit(0);
}}, 8000);
'''
out, err = run_test('Delivery-Ack Forward', f'cd /www/wwwroot/tools.wktyl.com/signaling && node -e \'{node_test_relay.replace(chr(10), " ").replace(chr(13), "")}\'')
print('\n5. 测试 chunk-ack / resume-request 通用转发')
node_test_chunk = f'''
const WebSocket = require("ws");
const ws1 = new WebSocket("ws://127.0.0.1:{WS_PORT}");
const ws2 = new WebSocket("ws://127.0.0.1:{WS_PORT}");
let peer1Id, peer2Id;
ws1.on("open", () => {{
ws1.send(JSON.stringify({{ type: "register", payload: {{ userId: "test_chunk_user" }} }}));
}});
ws1.on("message", (data) => {{
const msg = JSON.parse(data);
if (msg.type === "registered") peer1Id = msg.id;
}});
ws2.on("open", () => {{
ws2.send(JSON.stringify({{ type: "register", payload: {{ userId: "test_chunk_user" }} }}));
}});
ws2.on("message", (data) => {{
const msg = JSON.parse(data);
if (msg.type === "registered") {{
peer2Id = msg.id;
setTimeout(() => {{
ws1.send(JSON.stringify({{
type: "chunk-ack",
to: peer2Id,
taskId: "test_task_001",
chunkIndex: 5
}}));
}}, 500);
}}
if (msg.type === "chunk-ack") {{
console.log("CHUNK-ACK FORWARDED: " + JSON.stringify(msg));
ws1.send(JSON.stringify({{
type: "resume-request",
to: peer2Id,
taskId: "test_task_001",
missingChunks: [3, 7, 12]
}}));
}}
if (msg.type === "resume-request") {{
console.log("RESUME-REQUEST FORWARDED: " + JSON.stringify(msg));
ws1.close();
ws2.close();
process.exit(0);
}}
}});
setTimeout(() => {{
console.log("TIMEOUT");
ws1.close();
ws2.close();
process.exit(0);
}}, 8000);
'''
out, err = run_test('Chunk-Ack + Resume-Request Forward', f'cd /www/wwwroot/tools.wktyl.com/signaling && node -e \'{node_test_chunk.replace(chr(10), " ").replace(chr(13), "")}\'')
print('\n\n========================================')
print('测试结果汇总:')
print('========================================')
for name, success in test_results:
status = '✅ PASS' if success else '❌ FAIL'
print(f' {status} - {name}')
ssh.close()

View File

@@ -1,234 +0,0 @@
#!/usr/bin/env python3
"""
============================================================
闲言APP — 账号注销API测试脚本
创建时间: 2026-05-12
作用: 测试新版注销接口(requestDeletion/deletionStatus/cancelDeletion)
上次更新: v9.2.0 新建
============================================================
"""
import base64
import hashlib
import hmac
import json
import os
import time
import requests
BASE_URL = "https://tools.wktyl.com"
RECEIPT_SECRET = "Xy7kP9mL2qR4wS8v"
TIMEOUT = 15
TEST_USER = "apitest_user"
TEST_PASS = "123456"
session = requests.Session()
session.headers.update({
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
})
def generate_receipt(action: str, payload: str) -> dict:
ts = int(time.time())
nonce = os.urandom(4).hex()
payload_digest = hashlib.sha256(payload.encode()).hexdigest()[:16]
data = {
"action": action,
"payload": payload_digest,
"ts": ts,
"nonce": nonce,
}
receipt = base64.b64encode(json.dumps(data).encode()).decode()
sig = hmac.new(
RECEIPT_SECRET.encode(),
receipt.encode(),
hashlib.sha256,
).hexdigest()
return {"receipt": receipt, "sig": sig}
def log_test(name: str, success: bool, detail: str = ""):
status = "✅ PASS" if success else "❌ FAIL"
msg = f"{status} | {name}"
if detail:
msg += f"{detail}"
print(msg)
def api_post(path: str, data: dict = None, token: str = None) -> dict:
headers = {}
if token:
headers["token"] = token
resp = session.post(f"{BASE_URL}{path}", data=data, headers=headers, timeout=TIMEOUT)
return resp.json()
def api_get(path: str, params: dict = None, token: str = None) -> dict:
headers = {}
if token:
headers["token"] = token
resp = session.get(f"{BASE_URL}{path}", params=params, headers=headers, timeout=TIMEOUT)
return resp.json()
def main():
print("=" * 60)
print("闲言APP — 账号注销API测试")
print("=" * 60)
passed = 0
failed = 0
# Step 1: 登录获取Token
print("\n--- Step 1: 登录测试账号 ---")
resp = api_post("/api/user_security/login", {
"account": TEST_USER,
"password": TEST_PASS,
})
login_ok = resp.get("code") == 1
log_test("登录", login_ok, resp.get("msg", ""))
if not login_ok:
print("登录失败,终止测试")
return
token = resp["data"]["userinfo"]["token"]
user_id = resp["data"]["userinfo"]["id"]
print(f" Token: {token[:20]}...")
print(f" User ID: {user_id}")
if login_ok:
passed += 1
else:
failed += 1
# Step 2: 查询注销状态(应无申请)
print("\n--- Step 2: 查询注销状态(应无申请) ---")
resp = api_get("/api/user_security/deletionStatus", token=token)
status_ok = resp.get("code") == 1 and resp.get("data", {}).get("has_pending") == False
log_test("查询注销状态(无申请)", status_ok, f"has_pending={resp.get('data', {}).get('has_pending')}")
if status_ok:
passed += 1
else:
failed += 1
# Step 3: 申请注销(无回执,应失败)
print("\n--- Step 3: 申请注销(无回执,应失败) ---")
resp = api_post("/api/user_security/requestDeletion", {
"reason": "测试注销",
}, token=token)
no_receipt_ok = resp.get("code") == 0
log_test("无回执申请注销(应失败)", no_receipt_ok, resp.get("msg", ""))
if no_receipt_ok:
passed += 1
else:
failed += 1
# Step 4: 申请注销(带回执)
print("\n--- Step 4: 申请注销(带回执) ---")
receipt_data = generate_receipt("delete_account", str(user_id))
resp = api_post("/api/user_security/requestDeletion", {
"reason": "测试注销-自动化测试",
"receipt": receipt_data["receipt"],
"sig": receipt_data["sig"],
}, token=token)
apply_ok = resp.get("code") == 1
log_test("申请注销(带回执)", apply_ok, resp.get("msg", ""))
if apply_ok:
passed += 1
data = resp.get("data", {})
print(f" 注销记录ID: {data.get('id')}")
print(f" 状态: {data.get('status_text')}")
print(f" 自动注销时间: {data.get('auto_delete_time_text')}")
print(f" 倒计时: {data.get('countdown')}")
else:
failed += 1
print(f" 响应: {json.dumps(resp, ensure_ascii=False)}")
# Step 5: 重复申请注销(应失败)
print("\n--- Step 5: 重复申请注销(应失败) ---")
receipt_data2 = generate_receipt("delete_account", str(user_id))
resp = api_post("/api/user_security/requestDeletion", {
"reason": "重复申请",
"receipt": receipt_data2["receipt"],
"sig": receipt_data2["sig"],
}, token=token)
dup_ok = resp.get("code") == 0
log_test("重复申请注销(应失败)", dup_ok, resp.get("msg", ""))
if dup_ok:
passed += 1
else:
failed += 1
# Step 6: 查询注销状态(应有申请)
print("\n--- Step 6: 查询注销状态(应有申请) ---")
resp = api_get("/api/user_security/deletionStatus", token=token)
status_ok2 = resp.get("code") == 1 and resp.get("data", {}).get("has_pending") == True
log_test("查询注销状态(有申请)", status_ok2, f"has_pending={resp.get('data', {}).get('has_pending')}, status_text={resp.get('data', {}).get('status_text')}")
if status_ok2:
passed += 1
data = resp.get("data", {})
print(f" 倒计时: {data.get('countdown')}")
else:
failed += 1
# Step 7: 取消注销申请
print("\n--- Step 7: 取消注销申请 ---")
resp = api_post("/api/user_security/cancelDeletion", token=token)
cancel_ok = resp.get("code") == 1
log_test("取消注销申请", cancel_ok, resp.get("msg", ""))
if cancel_ok:
passed += 1
else:
failed += 1
# Step 8: 再次查询注销状态(应无待审核)
print("\n--- Step 8: 查询注销状态(取消后) ---")
resp = api_get("/api/user_security/deletionStatus", token=token)
status_ok3 = resp.get("code") == 1 and resp.get("data", {}).get("has_pending") == False
log_test("查询注销状态(已取消)", status_ok3, f"has_pending={resp.get('data', {}).get('has_pending')}")
if status_ok3:
passed += 1
else:
failed += 1
# Step 9: 取消注销(无申请时,应失败)
print("\n--- Step 9: 取消注销(无申请时,应失败) ---")
resp = api_post("/api/user_security/cancelDeletion", token=token)
no_cancel_ok = resp.get("code") == 0
log_test("无申请时取消(应失败)", no_cancel_ok, resp.get("msg", ""))
if no_cancel_ok:
passed += 1
else:
failed += 1
# Step 10: 旧路径兼容测试
print("\n--- Step 10: 旧路径兼容测试(/api/user/requestDeletion) ---")
receipt_data3 = generate_receipt("delete_account", str(user_id))
resp = api_post("/api/user/requestDeletion", {
"reason": "旧路径测试",
"receipt": receipt_data3["receipt"],
"sig": receipt_data3["sig"],
}, token=token)
old_path_ok = resp.get("code") == 1
log_test("旧路径申请注销", old_path_ok, resp.get("msg", ""))
if old_path_ok:
passed += 1
else:
failed += 1
# 清理: 取消刚才的申请
if old_path_ok:
print("\n--- 清理: 取消测试申请 ---")
resp = api_post("/api/user_security/cancelDeletion", token=token)
log_test("清理-取消申请", resp.get("code") == 1, resp.get("msg", ""))
# 结果汇总
print("\n" + "=" * 60)
total = passed + failed
print(f"测试结果: {passed}/{total} 通过,通过率{passed * 100 // total}%")
print("=" * 60)
if __name__ == "__main__":
main()