概述
在基于 Skynet 的高性能 Lua 服务框架中,数据库连接管理是保障系统稳定与高效的关键环节。本文介绍一种针对 MongoDB 的轻量级、高并发访问方案:通过 连接池机制 + 服务代理模式 实现安全、可控的数据库操作。
核心设计目标:
- ✅ 控制数据库总连接数,避免资源滥用;
- ✅ 支持 ORM 对象无缝持久化;
- ✅ 跨服务安全传递已序列化的 BSON 数据;
- ✅ 提供简洁统一的高层 API 接口。
该方案已在实际项目中验证其稳定性与性能表现,适用于中大型 Skynet 架构下的数据持久层建设。
系统架构设计
采用“一个服务对应一个数据库连接”的设计理念,多个连接组成连接池,业务服务不直接连接 MongoDB,而是通过专用代理服务(mongo_conn
)间接访问。
架构图示
┌─────────────┐
┌────────────► conn game ├────────┐
│ └─────────────┘ │
┌─────┴──────┐ ┌─────▼────┐
│ role agent │ │ MongoDB │
└─────┬──────┘ └─────▲────┘
│ ┌─────────────┐ │
└────────────► conn game ├────────┘
└─────────────┘
如上所示:
conn game
表示数据库game
的连接池,本例大小为 2;role agent
是玩家角色服务,不直连数据库,所有请求经由连接池转发;- 每个
mongo_conn
服务维护一条独立的 MongoDB 连接。
⚙️ 优势:连接总数可控、连接复用、隔离风险、便于监控与扩展。
mongo_conn
服务实现
mongo_conn
是真正与 MongoDB 建立物理连接的服务模块。每个实例负责一个数据库连接。
1. 初始化变量
local g_name, g_index = ...
g_index = tonumber(g_index)
local g_db
local mongo_config = config.get_table("mongo_config")
local service_name = string.format("mongo_conn:%s:%d", g_name, g_index)
命名规范为:mongo_conn:<dbname>:<index>
,其中:
dbname
:目标数据库名;index
:连接池中的序号(从1开始)。
2. 启动并初始化连接
local function init_db(dbname)
local db_config = mongo_config[dbname]
assert(db_config, "Database not configured: " .. dbname)
local dbs = mongo.client(db_config.cfg)
g_db = dbs[dbname]
end
skynet.start(function()
init_db(g_name)
cmd_api.dispatch(CMD)
end)
启动时根据配置创建 MongoDB 客户端,并绑定指定数据库。
3. 封装底层操作接口
使用 raw 接口接收已编码的 BSON 数据,确保跨服务传输安全:
function CMD.find_and_modify(coll, doc)
local col_obj = g_db[coll]
return col_obj:findAndModify(doc)
end
function CMD.raw_safe_insert(coll, bson_str)
local col_obj = g_db[coll]
return col_obj:raw_safe_insert(bson_str)
end
function CMD.raw_safe_update(coll, bson_str)
local col_obj = g_db[coll]
log.debug("raw_safe_update", "coll", coll, "bson_str", bson_str)
return col_obj:raw_safe_update(bson_str)
end
⚠️ 注意:参数bson_str
必须在调用方提前完成bson_encode
,以支持 ORM 结构的正确序列化。
客户端接口库:mongo_conn
抽象封装
为简化上层使用,我们提供一套面向对象风格的客户端封装,屏蔽底层调度细节。
1. 连接对象 (conn
) 实现
每个 conn
实例对应一个 mongo_conn
服务地址:
local conn_mt = {}
conn_mt.__index = conn_mt
function conn_mt:call(cmd, ...)
return skynet.call(self.addr, "lua", cmd, ...)
end
local conn = {}
function conn.new(name, index)
local service_name = string.format("mongo_conn:%s:%d", name, index)
local service_addr = skynet.uniqueservice(service_name, name, index)
log.info("mongo_conn new", "service_name", service_name, "service_addr", service_addr)
return setmetatable({ name = service_name, addr = service_addr }, conn_mt)
end
利用 skynet.uniqueservice
确保每个 (name, index)
组合仅启动一次服务。
自定义 Loader 支持带冒号的服务名
由于服务名为 mongo_conn:dbname:index
,需对默认 loader 做适配,提取真实入口文件名:
SERVICE_ARGS = ...
local args = {}
for word in string.gmatch(SERVICE_ARGS, "%S+") do
table.insert(args, word)
end
-- 忽略冒号后的内容,用于查找服务文件
SERVICE_NAME = string.gsub(args[1], ":.*", "", 1)
这样即使服务名为 mongo_conn:game:1
,仍能正确加载 service/mongo_conn.lua
文件。
2. 数据库对象 (db
) 封装
db
表示一个数据库及其连接池:
local db_mt = {}
db_mt.__index = db_mt
function db_mt:get_collection(coll)
local collection = self.collections[coll]
if not collection then
collection = setmetatable({
db = self,
coll = coll,
}, coll_mt)
self.collections[coll] = collection
end
return collection
end
-- 默认随机路由策略(可扩展为轮询或健康检查)
function db_mt:_route()
local index = math.random(1, #self.conns)
return self.conns[index]
end
-- 初始化连接池
function db_mt:init()
local db_config = mongo_config[self.name]
for i = 1, db_config.connections do
local conn_obj = conn.new(self.name, i)
table.insert(self.conns, conn_obj)
end
end
3. 集合操作对象 (coll
) 封装
提供高层接口,自动完成 BSON 编码与路由:
local coll_mt = {}
coll_mt.__index = coll_mt
function coll_mt:find_and_modify(doc)
local conn_obj = self.db:_route()
return conn_obj:call("find_and_modify", self.coll, doc)
end
function coll_mt:safe_insert(doc)
local conn_obj = self.db:_route()
log.debug("safe_insert doc:", "doc", doc)
local bson_obj = bson_encode(doc)
return conn_obj:call("raw_safe_insert", self.coll, to_lightuserdata(bson_obj))
end
function coll_mt:safe_update(query, update, upsert, multi)
local conn_obj = self.db:_route()
log.debug("safe_update", "query", query, "update", update, "upsert", upsert, "multi", multi)
local bson_obj = bson_encode({
q = query,
u = update,
upsert = upsert,
multi = multi,
})
return conn_obj:call("raw_safe_update", self.coll, to_lightuserdata(bson_obj))
end
🔐 关键点:使用
to_lightuserdata
(见 Skynet PR #2075)传递指针而非拷贝数据,减少内存开销。必须使用skynet.call
阻塞调用,防止源数据被提前释放。
4. 模块统一入口
暴露简洁接口供外部调用:
function M.get_collection(name, coll)
local db_obj = get_db(name)
return db_obj:get_collection(coll)
end
使用者只需传入数据库名和集合名即可获得操作句柄。
使用示例:用户数据服务
以下为 user_db_api.lua
中的实际调用代码:
function M.create(account)
local obj = {
account = account,
create_time = time.now_ms(),
}
log.debug("begin create account", "account", account, "obj", obj)
local ok, err, r = g_coll_obj:safe_insert(obj)
log.info("end create account", "account", account, "ok", ok, "err", err, "r", r)
if not ok then
return false, err, r
end
return obj
end
-- 初始化阶段获取集合句柄
skynet.init(function()
local name = config.get("user_db_name")
local coll = config.get("user_db_coll")
log.info("user_db_api init", "db", name, "coll", coll)
g_coll_obj = mongo_conn.get_collection(name, coll)
end)
接入 sproto-orm 支持
为了让 ORM 对象像普通 Lua 表一样被遍历,需重写全局 next
函数以支持元方法 __next
。
1. 扩展 next
函数(放入 preload.lua
)
-- 支持 ORM 对象迭代
local old_next = next
_G.next = function(t, i)
local mt = getmetatable(t)
if mt and mt.__next then
return mt.__next(old_next, t, i)
end
return old_next(t, i)
end
_G.rawnext = old_next
此修改使得 pairs()
和 ipairs()
可正常作用于 ORM 包裹的对象。
2. ORM 数据管理封装(dbmgr.lua
)
对外仅暴露两个核心接口:
function M.load(dbname, dbcoll, key, unique_id, default)
-- 加载或插入文档
local coll_obj = get_collection_obj(dbname, dbcoll)
local ret = coll_obj:find_and_modify({
query = { [key] = unique_id },
update = { ["$setOnInsert"] = default or {} },
fields = g_default_projection,
upsert = true,
new = true,
})
if ret.ok ~= 1 then
log.error("load failed", "dbname", dbname, "dbcoll", dbcoll, "key", key, "unique_id", unique_id, "ret", ret)
return nil
end
-- 使用 schema 创建 ORM 实例
local doc = schema[dbcoll].new(ret.value)
-- 启动定时器,周期性保存脏数据(防抖+随机延迟)
local timer_obj = timer.repeat_random_delayed("dbmgr", db_save_interval, function()
save_doc(coll_obj, key, unique_id, doc)
log.debug("timer save", "dbname", dbname, "dbcoll", dbcoll, "key", key, "unique_id", unique_id)
end)
return doc, timer_obj
end
function M.unload(dbname, dbcoll, key, unique_id)
-- 清理缓存、停止定时器等
...
end
📝 规范说明:
dbcoll
名称必须与schema
中定义的类型一致,无需额外映射配置,提升一致性与可维护性。
使用案例:rolemgr
加载角色数据
在 roleagent
服务中,rolemgr
模块通过 dbmgr.load
接口加载角色数据,示例如下:
-- rolemgr.lua
function M.load_role(rid)
if g_roles[rid] then
log.info("load role already exists", "rid", rid)
return g_roles[rid]
end
-- 从数据库加载角色数据(自动创建 ORM 对象)
local role_data = dbmgr.load(g_role_db_name, g_role_db_coll, "rid", rid)
-- 创建角色对象并注入模块
local role_obj = role.new(M, rid, role_data)
g_roles[rid] = role_obj
modules.load(role_obj, role_data)
return role_obj
end
该方式实现了:
- ✅ 数据懒加载;
- ✅ ORM 自动封装;
- ✅ 脏数据自动回写;
- ✅ 业务逻辑与数据访问解耦。
总结
本文提出了一套完整的 Skynet + MongoDB 数据访问解决方案,具备以下特性:
特性 | 说明 |
---|---|
🧱 连接池管理 | 固定连接数,防止资源耗尽 |
🔄 负载均衡 | 请求随机分发至连接池成员 |
💾 ORM 支持 | 兼容 sproto-orm,支持对象化操作 |
🔗 跨服务安全传输 | 使用 to_lightuserdata 零拷贝传递 BSON |
🧩 易用性高 | 提供简洁 API,隐藏复杂调度逻辑 |
该方案已在生产环境长期运行,表现稳定,适合用于构建高并发、低延迟的分布式游戏或后台服务系统。
参考资料
- GitHub 项目地址:https://github.com/hanxi/skyext
- Skynet 官方仓库:https://github.com/cloudwu/skynet
- 相关 PR:Support lightuserdata for cross-service data sharing #2075
欢迎 Fork 并贡献改进!如果您正在构建基于 Skynet 的服务架构,这套数据库封装方案将为您提供坚实的数据访问基础。