import websockets.sync.client, time, json, threading, time
from pymud import PyMudApp, Session
WEBSOCKET_URI = "ws://127.0.0.1:5556"
# 固定模板格式:插件名称
PLUGIN_NAME = "websocket插件"
# 固定模板格式:插件描述
PLUGIN_DESC = {
"VERSION": "1.0.0",
"AUTHOR": "shanghua",
"RELEASE_DATE": "2025-03-26",
"DESCRIPTION": "通过socket为客户端提供消息服务",
}
class WSServer:
def __init__(self, app) -> None:
self.app = app
self.session = None
self.ws = None
self.running = False
app.set_status(f"插件 {PLUGIN_NAME} 已加载!")
def start_connection(self, session):
self.session = session
# 启动连接线程
self.running = True
self.conn_thread = threading.Thread(target=self._connection_loop)
self.conn_thread.daemon = True
self.conn_thread.start()
def _connection_loop(self):
self.session.info("开始连接WebSocket")
while self.running:
try:
# 同步连接 WebSocket 服务器
try:
self.ws = websockets.sync.client.connect(WEBSOCKET_URI)
self.app.set_status(f"连接 WebSocket 服务成功: {WEBSOCKET_URI}")
except Exception as e:
print(f"连接 WebSocket 服务时出错: {e}")
# 心跳线程
heartbeat_thread = threading.Thread(target=self._heartbeat)
heartbeat_thread.daemon = True
heartbeat_thread.start()
# 消息接收循环
while self.running:
try:
response = self.ws.recv()
if type(response).__name__ == "str":
response = json.loads(response)
if response["type"] == "web":
self.session.exec(response["cmd"])
self.app.set_status(f"消息: {response['cmd']}")
except Exception as e:
self.app.set_status(f"接收错误: {e}")
break
except Exception as e:
self.app.set_status(f"连接失败: {e}")
time.sleep(5) # 重连间隔
finally:
self.close()
# 每30秒发送心跳,与服务器保持连接
def _heartbeat(self):
while self.running and self.ws:
try:
time.sleep(30)
# ws自动发送ping
self.ws.ping()
except Exception as e:
self.app.set_status(f"心跳发送失败: {e}")
break
def close(self):
self.running = False
if self.ws:
self.ws.close()
# 下边3个函数是固定的生命周期,函数名不能修改,否则插件无法加载
# /**
# * 固定模板格式:在应用启动时执行
# * 插件启动时实例化上边的类,并且将实例设置为全局变量,方便其他插件使用
# * @param app 应用实例
# */
def PLUGIN_PYMUD_START(app: PyMudApp):
ws = WSServer(app)
# 设置全局变量,保存ws实例
app.set_globals("ws", ws)
pass
# /**
# * 固定模板格式:在每一个会话创建时执行
# * 通过session获取ws实例,并且调用start_connection方法,启动连接
# * @param session 会话实例
# */
def PLUGIN_SESSION_CREATE(session: Session):
ws = session.application.get_globals("ws") # 这里已经正确获取了全局变量
ws.start_connection(session)
pass
# 固定模板格式:在某一个会话被销毁(关闭)时执行
def PLUGIN_SESSION_DESTROY(session: Session):
pass