Skynet 中 MongoDB 数据库操作接口的封装设计

概述

在基于 Skynet 的高性能 Lua 服务框架中,数据库连接管理是保障系统稳定与高效的关键环节。本文介绍一种针对 MongoDB 的轻量级、高并发访问方案:通过 连接池机制 + 服务代理模式 实现安全、可控的数据库操作。

核心设计目标:

该方案已在实际项目中验证其稳定性与性能表现,适用于中大型 Skynet 架构下的数据持久层建设。


系统架构设计

采用“一个服务对应一个数据库连接”的设计理念,多个连接组成连接池,业务服务不直接连接 MongoDB,而是通过专用代理服务(mongo_conn)间接访问。

架构图示

                     ┌─────────────┐                
        ┌────────────► conn  game  ├────────┐       
        │            └─────────────┘        │       
  ┌─────┴──────┐                      ┌─────▼────┐  
  │ role agent │                      │  MongoDB │  
  └─────┬──────┘                      └─────▲────┘  
        │            ┌─────────────┐        │       
        └────────────► conn  game  ├────────┘       
                     └─────────────┘

如上所示:

⚙️ 优势:连接总数可控、连接复用、隔离风险、便于监控与扩展。


mongo_conn 服务实现

mongo_conn 是真正与 MongoDB 建立物理连接的服务模块。每个实例负责一个数据库连接。

1. 初始化变量

local g_name, g_index = ...
g_index = tonumber(g_index)
local g_db
local mongo_config = config.get_table("mongo_config")
local service_name = string.format("mongo_conn:%s:%d", g_name, g_index)

命名规范为:mongo_conn:<dbname>:<index>,其中:

2. 启动并初始化连接

local function init_db(dbname)
    local db_config = mongo_config[dbname]
    assert(db_config, "Database not configured: " .. dbname)
    local dbs = mongo.client(db_config.cfg)
    g_db = dbs[dbname]
end

skynet.start(function()
    init_db(g_name)
    cmd_api.dispatch(CMD)
end)

启动时根据配置创建 MongoDB 客户端,并绑定指定数据库。

3. 封装底层操作接口

使用 raw 接口接收已编码的 BSON 数据,确保跨服务传输安全:

function CMD.find_and_modify(coll, doc)
    local col_obj = g_db[coll]
    return col_obj:findAndModify(doc)
end

function CMD.raw_safe_insert(coll, bson_str)
    local col_obj = g_db[coll]
    return col_obj:raw_safe_insert(bson_str)
end

function CMD.raw_safe_update(coll, bson_str)
    local col_obj = g_db[coll]
    log.debug("raw_safe_update", "coll", coll, "bson_str", bson_str)
    return col_obj:raw_safe_update(bson_str)
end

⚠️ 注意:参数 bson_str 必须在调用方提前完成 bson_encode,以支持 ORM 结构的正确序列化。


客户端接口库:mongo_conn 抽象封装

为简化上层使用,我们提供一套面向对象风格的客户端封装,屏蔽底层调度细节。

1. 连接对象 (conn) 实现

每个 conn 实例对应一个 mongo_conn 服务地址:

local conn_mt = {}
conn_mt.__index = conn_mt

function conn_mt:call(cmd, ...)
    return skynet.call(self.addr, "lua", cmd, ...)
end

local conn = {}
function conn.new(name, index)
    local service_name = string.format("mongo_conn:%s:%d", name, index)
    local service_addr = skynet.uniqueservice(service_name, name, index)
    log.info("mongo_conn new", "service_name", service_name, "service_addr", service_addr)
    return setmetatable({ name = service_name, addr = service_addr }, conn_mt)
end

利用 skynet.uniqueservice 确保每个 (name, index) 组合仅启动一次服务。

自定义 Loader 支持带冒号的服务名

由于服务名为 mongo_conn:dbname:index,需对默认 loader 做适配,提取真实入口文件名:

SERVICE_ARGS = ...

local args = {}
for word in string.gmatch(SERVICE_ARGS, "%S+") do
    table.insert(args, word)
end

-- 忽略冒号后的内容,用于查找服务文件
SERVICE_NAME = string.gsub(args[1], ":.*", "", 1)

这样即使服务名为 mongo_conn:game:1,仍能正确加载 service/mongo_conn.lua 文件。


2. 数据库对象 (db) 封装

db 表示一个数据库及其连接池:

local db_mt = {}
db_mt.__index = db_mt

function db_mt:get_collection(coll)
    local collection = self.collections[coll]
    if not collection then
        collection = setmetatable({
            db = self,
            coll = coll,
        }, coll_mt)
        self.collections[coll] = collection
    end
    return collection
end

-- 默认随机路由策略(可扩展为轮询或健康检查)
function db_mt:_route()
    local index = math.random(1, #self.conns)
    return self.conns[index]
end

-- 初始化连接池
function db_mt:init()
    local db_config = mongo_config[self.name]
    for i = 1, db_config.connections do
        local conn_obj = conn.new(self.name, i)
        table.insert(self.conns, conn_obj)
    end
end

3. 集合操作对象 (coll) 封装

提供高层接口,自动完成 BSON 编码与路由:

local coll_mt = {}
coll_mt.__index = coll_mt

function coll_mt:find_and_modify(doc)
    local conn_obj = self.db:_route()
    return conn_obj:call("find_and_modify", self.coll, doc)
end

function coll_mt:safe_insert(doc)
    local conn_obj = self.db:_route()
    log.debug("safe_insert doc:", "doc", doc)
    local bson_obj = bson_encode(doc)
    return conn_obj:call("raw_safe_insert", self.coll, to_lightuserdata(bson_obj))
end

function coll_mt:safe_update(query, update, upsert, multi)
    local conn_obj = self.db:_route()
    log.debug("safe_update", "query", query, "update", update, "upsert", upsert, "multi", multi)
    local bson_obj = bson_encode({
        q = query,
        u = update,
        upsert = upsert,
        multi = multi,
    })
    return conn_obj:call("raw_safe_update", self.coll, to_lightuserdata(bson_obj))
end

🔐 关键点:使用 to_lightuserdata(见 Skynet PR #2075)传递指针而非拷贝数据,减少内存开销。必须使用 skynet.call 阻塞调用,防止源数据被提前释放。


4. 模块统一入口

暴露简洁接口供外部调用:

function M.get_collection(name, coll)
    local db_obj = get_db(name)
    return db_obj:get_collection(coll)
end

使用者只需传入数据库名和集合名即可获得操作句柄。


使用示例:用户数据服务

以下为 user_db_api.lua 中的实际调用代码:

function M.create(account)
    local obj = {
        account = account,
        create_time = time.now_ms(),
    }
    log.debug("begin create account", "account", account, "obj", obj)
    local ok, err, r = g_coll_obj:safe_insert(obj)
    log.info("end create account", "account", account, "ok", ok, "err", err, "r", r)
    if not ok then
        return false, err, r
    end
    return obj
end

-- 初始化阶段获取集合句柄
skynet.init(function()
    local name = config.get("user_db_name")
    local coll = config.get("user_db_coll")
    log.info("user_db_api init", "db", name, "coll", coll)
    g_coll_obj = mongo_conn.get_collection(name, coll)
end)

接入 sproto-orm 支持

为了让 ORM 对象像普通 Lua 表一样被遍历,需重写全局 next 函数以支持元方法 __next

1. 扩展 next 函数(放入 preload.lua

-- 支持 ORM 对象迭代
local old_next = next
_G.next = function(t, i)
    local mt = getmetatable(t)
    if mt and mt.__next then
        return mt.__next(old_next, t, i)
    end
    return old_next(t, i)
end
_G.rawnext = old_next

此修改使得 pairs()ipairs() 可正常作用于 ORM 包裹的对象。


2. ORM 数据管理封装(dbmgr.lua

对外仅暴露两个核心接口:

function M.load(dbname, dbcoll, key, unique_id, default)
    -- 加载或插入文档
    local coll_obj = get_collection_obj(dbname, dbcoll)
    local ret = coll_obj:find_and_modify({
        query = { [key] = unique_id },
        update = { ["$setOnInsert"] = default or {} },
        fields = g_default_projection,
        upsert = true,
        new = true,
    })

    if ret.ok ~= 1 then
        log.error("load failed", "dbname", dbname, "dbcoll", dbcoll, "key", key, "unique_id", unique_id, "ret", ret)
        return nil
    end

    -- 使用 schema 创建 ORM 实例
    local doc = schema[dbcoll].new(ret.value)

    -- 启动定时器,周期性保存脏数据(防抖+随机延迟)
    local timer_obj = timer.repeat_random_delayed("dbmgr", db_save_interval, function()
        save_doc(coll_obj, key, unique_id, doc)
        log.debug("timer save", "dbname", dbname, "dbcoll", dbcoll, "key", key, "unique_id", unique_id)
    end)

    return doc, timer_obj
end

function M.unload(dbname, dbcoll, key, unique_id)
    -- 清理缓存、停止定时器等
    ...
end

📝 规范说明:dbcoll 名称必须与 schema 中定义的类型一致,无需额外映射配置,提升一致性与可维护性。


使用案例:rolemgr 加载角色数据

roleagent 服务中,rolemgr 模块通过 dbmgr.load 接口加载角色数据,示例如下:

-- rolemgr.lua
function M.load_role(rid)
    if g_roles[rid] then
        log.info("load role already exists", "rid", rid)
        return g_roles[rid]
    end

    -- 从数据库加载角色数据(自动创建 ORM 对象)
    local role_data = dbmgr.load(g_role_db_name, g_role_db_coll, "rid", rid)

    -- 创建角色对象并注入模块
    local role_obj = role.new(M, rid, role_data)
    g_roles[rid] = role_obj
    modules.load(role_obj, role_data)

    return role_obj
end

该方式实现了:


总结

本文提出了一套完整的 Skynet + MongoDB 数据访问解决方案,具备以下特性:

特性 说明
🧱 连接池管理 固定连接数,防止资源耗尽
🔄 负载均衡 请求随机分发至连接池成员
💾 ORM 支持 兼容 sproto-orm,支持对象化操作
🔗 跨服务安全传输 使用 to_lightuserdata 零拷贝传递 BSON
🧩 易用性高 提供简洁 API,隐藏复杂调度逻辑

该方案已在生产环境长期运行,表现稳定,适合用于构建高并发、低延迟的分布式游戏或后台服务系统。


参考资料


欢迎 Fork 并贡献改进!如果您正在构建基于 Skynet 的服务架构,这套数据库封装方案将为您提供坚实的数据访问基础。

点击进入评论 ...