LingTropy/lingtropy-client/scripts/update_server.py
2025-07-26 15:56:45 +08:00

160 lines
5.4 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import json
import hashlib
import http.server
import socketserver
import shutil
import sys
from datetime import datetime
# 配置
PORT = 3000
UPDATES_DIR = "updates"
CURRENT_VERSION = "1.0.0"
NEW_VERSION = "1.0.4"
# 获取脚本所在目录的绝对路径
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# 设置工作目录为脚本所在目录
os.chdir(SCRIPT_DIR)
def log(message):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {message}")
log(f"启动更新服务器在端口 {PORT}")
log(f"脚本目录: {SCRIPT_DIR}")
log(f"更新目录: {os.path.join(SCRIPT_DIR, UPDATES_DIR)}")
log(f"当前版本: {CURRENT_VERSION}")
log(f"新版本: {NEW_VERSION}")
# 确保更新目录存在
os.makedirs(UPDATES_DIR, exist_ok=True)
# 生成latest.yml文件
def generate_latest_yml():
log("生成 latest.yml 文件...")
# 假设安装包已经存在
installer_path = os.path.join(UPDATES_DIR, f"LingTropy-{NEW_VERSION}.exe")
# 如果安装包不存在,创建一个假的
if not os.path.exists(installer_path):
log(f"创建测试安装包: {installer_path}")
with open(installer_path, "wb") as f:
f.write(b"This is a fake installer file")
# 计算文件大小和SHA512
file_size = os.path.getsize(installer_path)
with open(installer_path, "rb") as f:
file_hash = hashlib.sha512(f.read()).hexdigest()
# 生成latest.yml内容
yml_content = f"""version: {NEW_VERSION}
files:
- url: LingTropy-{NEW_VERSION}.exe
sha512: {file_hash}
size: {file_size}
path: LingTropy-{NEW_VERSION}.exe
sha512: {file_hash}
releaseDate: '{datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z")}'
"""
# 写入latest.yml文件
yml_path = os.path.join(UPDATES_DIR, "latest.yml")
with open(yml_path, "w", encoding="utf-8") as f:
f.write(yml_content)
log(f"已生成 latest.yml 文件: {yml_path}")
log(f"文件内容:\n{yml_content}")
# 自定义HTTP请求处理器
class UpdateRequestHandler(http.server.SimpleHTTPRequestHandler):
def log_message(self, format, *args):
log(f"{self.address_string()} - {format%args}")
def do_GET(self):
log(f"收到GET请求: {self.path}")
try:
# 处理 /updates 路径
if self.path.startswith('/updates'):
# 移除 /updates 前缀
self.path = self.path[len('/updates'):]
if not self.path:
self.path = '/'
log(f"处理更新请求,修改后的路径: {self.path}")
# 设置CORS头
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, OPTIONS')
self.send_header('Access-Control-Allow-Headers', '*')
# 根据请求路径设置Content-Type
if self.path.endswith('.yml'):
log("返回YAML文件")
self.send_header('Content-Type', 'application/x-yaml')
elif self.path.endswith('.exe'):
log("返回EXE文件")
self.send_header('Content-Type', 'application/octet-stream')
else:
log("返回文本内容")
self.send_header('Content-Type', 'text/plain')
self.end_headers()
# 如果请求的是根路径,返回更新信息
if self.path == '/':
response = {
"version": NEW_VERSION,
"releaseDate": datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z"),
"releaseNotes": "This is a test update with some new features and bug fixes."
}
log(f"返回更新信息: {json.dumps(response, indent=2)}")
self.wfile.write(json.dumps(response).encode())
return
# 否则,尝试提供请求的文件
file_path = self.translate_path(self.path)
log(f"请求文件路径: {file_path}")
if os.path.exists(file_path) and os.path.isfile(file_path):
log(f"文件存在,正在发送...")
with open(file_path, 'rb') as f:
self.wfile.write(f.read())
else:
log(f"文件不存在: {file_path}")
self.send_error(404, "File not found")
except Exception as e:
log(f"处理请求时出错: {str(e)}")
self.send_error(500, str(e))
def do_OPTIONS(self):
log(f"收到OPTIONS请求: {self.path}")
# 处理预检请求
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, OPTIONS')
self.send_header('Access-Control-Allow-Headers', '*')
self.end_headers()
def main():
try:
# 生成更新文件
generate_latest_yml()
# 启动服务器
with socketserver.TCPServer(("", PORT), UpdateRequestHandler) as httpd:
log(f"服务器已启动在 http://localhost:{PORT}")
log("按 Ctrl+C 停止服务器")
httpd.serve_forever()
except KeyboardInterrupt:
log("服务器已停止")
sys.exit(0)
except Exception as e:
log(f"服务器错误: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()