重读 skynet 源码

1. 为什么使用 daemon(1,1) + redirect_fds 来设置后台模式而不直接使用 daemon(1,0)?

cloudwu/skynet#590

因为 write_pid 里输出错误信息到 stderr,此时已经是 daemon 模式,stderr 已经被重定向为 /dev/null,所有错误信息是看不到的。
所以需要延迟把 stderr 这些 fd 重定向为 /dev/null 。

2. 为什么 global_mq 从 cas 无锁结构改为 spinlock ?

https://blog.codingnow.com/2014/12/skynet_spinlock.html

cloudwu/skynet#208

在 skynet 的核心消息队列调度模块中,无锁结构能获得的好处其实非常有限。使用 spinlock ,数据结构也可以大大简化。

3. 基础知识点整理

字符串操作

多线程私有数据 pthread_key_create

最初 skynet 是使用 __thread 修饰符来处理线程私有数据的,后面改成 pthread_key_t 来处理。

pthread_key_t 优于 __thread 从下面几个方面来说:

4. 信号 signal 的使用

skynet 处理了 SIGHUP 和 SIGPIPE 两个信号.

SIGHUP 信号处理

	struct sigaction sa;
	sa.sa_handler = &handle_hup;
	sa.sa_flags = SA_RESTART;
	sigfillset(&sa.sa_mask);
	sigaction(SIGHUP, &sa, NULL);

SIGPIPE 信号处理

    struct sigaction sa;
	sa.sa_handler = SIG_IGN; // 信号被忽略
	sa.sa_flags = 0;
	sigemptyset(&sa.sa_mask);
	sigaction(SIGPIPE, &sa, NULL);

为什么需要忽略 SIGPIPE 信号?

TCP 是全双工信道, 可看作两条单工信道, TCP 连接两端的两个端点各负责一条. 当对端调用 close 时, 虽然本意是关闭整个两条信道,
但本端只是收到 FIN 包. 按照 TCP 协议的语义, 表示对端只是关闭了其所负责的那一条单工信道, 仍然可以继续接收数据. 也就是说, 因为 TCP 协议的限制,
一个端点无法获知对端的 socket 是调用了 close 还是 shutdown.
对一个已经收到 FIN 包的 socket 调用 read 方法,
如果接收缓冲已空, 则返回 0, 这就是常说的表示连接关闭. 但第一次对其调用 write 方法时, 如果发送缓冲没问题, 会返回正确写入(发送).
但发送的报文会导致对端发送 RST 报文, 因为对端的 socket 已经调用了 close, 完全关闭, 既不发送, 也不接收数据. 所以,
第二次调用 write 方法(假设在收到 RST 之后), 会生成 SIGPIPE 信号, 导致进程退出.
为了避免进程退出, 可以捕获 SIGPIPE 信号, 或者忽略它, 给它设置 SIG_IGN 信号处理函数,
这样, 第二次调用 write 方法时, 会返回-1, 同时 errno 置为 SIGPIPE. 程序便能知道对端已经关闭.

为什么使用 sigaction 而不使用 signal

sigaction() is POSIX. signal() is bad and poorly-defined, but is a C standard function and so it works on anything.
https://stackoverflow.com/questions/231912/what-is-the-difference-between-sigaction-and-signal
优先推荐使用 sigaction

下面这段代码来自 Lua lua.c , 在 POSIX 平台优先使用 sigaction

#if defined(LUA_USE_POSIX)   /* { */

/*
** Use 'sigaction' when available.
*/
static void setsignal (int sig, void (*handler)(int)) {
  struct sigaction sa;
  sa.sa_handler = handler;
  sa.sa_flags = 0;
  sigemptyset(&sa.sa_mask);  /* do not mask any signal */
  sigaction(sig, &sa, NULL);
}

#else                       /* }{ */

#define setsignal            signal

#endif                      /* } */

5. skynet_handle.c 分析

skynet_handle 有两个用途,第一个是给服务分配一个正整数作为服务 id,第二个是给服务绑定名字。

struct handle_name {
	char * name;
	uint32_t handle;
};

struct handle_storage {
	struct rwlock lock;

	uint32_t harbor; // 进程节点id
	uint32_t handle_index; // 下一个可分配的id
	int slot_size; // slot 数组的大小
	struct skynet_context ** slot; // 存放 ctx 的数组, 数组下标为服务的地址

	int name_cap; // handle_name 数组容量
	int name_count; // handle_name 数量
	struct handle_name *name; // 按名字排序的数组,采用二分查找插入元素.
};

slot 是存放 skynet_context 的 hash 表,接口 skynet_handle_register 为服务分配 id。一个服务对应一个 skynet_context 。handle_index 为下一个服务 id ,服务 id 的范围从 1 到 0xffffff,服务 id 的高 8 位保留,作为 master/slave 模式的 harbor id 。

取 hash 比较简单: int hash = handle & (s->slot_size-1); ,当 slot_size 为 4 时:

刚好控制 hash 的范围是 [0,3] , 用 & 运算符比 % 运算符更快。hash 冲突就是简单的往后遍历寻找空位。

slot 数组被全都被分配完之后,就扩大 2 倍,然后把旧的数据重新 hash 放到新的 slot 里。最后再进入循环重新找一次空位。

skynet_handle_retire 就是从 slot 中删除服务。用服务 id 取 hash 值,然后从 slot 中移除,移除时同时从 name 数组里把绑定的名字都移除。移除名字刚开始看为什么需要使用遍历的 O(n) 而不是使用二分查找,最后发现是因为支持给一个服务注册多个名字的。所以需要遍历所有的名字,检查到 服务 id 相同的就清理。

skynet_handle_retireall 就是循环遍历 slot ,逐个调用 skynet_handle_retire 把服务删除。

skynet_handle_grab 的目的就是找到 context 然后调用 skynet_context_grab

skynet_handle_findname 是通过名字查找服务 id ,采用二分查找,name 数组是按名字排序的。

skynet_handle_namehandle 给服务取个名字,先通过二分查找找到名字插入的位置 before ,然后插入空位。如果没空位了(s->name_count >= s->name_cap) 就把数组扩容再插入。

skynet_handle_init 就是初始化 static struct handle_storage *H

s->slot_size = DEFAULT_SLOT_SIZE; // 存放 context 的初始大小 4
s->name_cap = 2; // 存放名字的初始大小 2

struct rwlock lock; 使用读写锁是因为读取比插入删除更频繁,每一条消息的处理都需要根据 handle 来取 skynet_context (skynet_handle_grab),如果是根据名字发送的消息,都是需要先通过名字查询到 handle (skynet_handle_findname)。

一个崩溃问题重现:

int hash = skynet_context_handle(s->slot[i]) & (s->slot_size * 2 - 1); 这行代码有可能会崩溃,当 s->slot[0] 为 NULL 时,即 context 为 NULL,对 NULL 取 handle 值会崩溃。需要构造出 slot[0] 为空,其他值都不空,且 handle_index 处于 (HANDLE_MASK - slot_size, HANDLE_MASK] 范围内。

已修复 cloudwu/skynet#1574

另外给服务取名的功能是云风想要废弃的功能,可以从底层删掉后,放到上层来做。来源: cloudwu/skynet#1383

6. monitor 分析

主要代码实现在 skynet_start.c 和 skynet_monitor.c 。启动入口为 start(int thread) 函数。 thread 为 worker 线程的数量,另外还有 3 个特殊线程,分别为 monitor 线程(监视 worker 线程),timer 线程, socket 线程。

四种线程的入口函数如下:

create_thread(&pid[0], thread_monitor, m);
create_thread(&pid[1], thread_timer, m);
create_thread(&pid[2], thread_socket, m);
create_thread(&pid[i+3], thread_worker, &wp[i]);

线程之间的关系如下:

监视器的数据结构如下,进程内全局只有一个,跟 struct handle_storage 一样。

struct monitor {
	int count; // worker 线程数量
	struct skynet_monitor ** m; // 一个元素监视一个 worker 线程
	pthread_cond_t cond; // 条件变量,调度 worker 线程
	pthread_mutex_t mutex; // 互斥锁, 调度 worker 线程
	int sleep; // worker 线程睡眠的数量
	int quit; // 进程退出标记
};

关于条件变量和互斥锁的使用可以参考 https://www.cnblogs.com/cthon/p/9084735.html

pthread_cond_wait 必须放在 pthread_mutex_lock 和 pthread_mutex_unlock 之间,因为他要根据共享变量的状态来决定是否要等待,而为了不永远等待下去所以必须要在 lock/unlock 队中。共享变量的状态改变必须遵守 lock/unlock 的规则。 pthread_cond_signal 既可以放在 pthread_mutex_lock 和 pthread_mutex_unlock 之间,也可以放在 pthread_mutex_lock 和 pthread_mutex_unlock 之后。

static void *
thread_monitor(void *p) {
	// ...
	for (;;) {
		CHECK_ABORT
		for (i=0;i<n;i++) {
			skynet_monitor_check(m->m[i]);
		}
		for (i=0;i<5;i++) {
			CHECK_ABORT
			sleep(1);
		}
	}
	// ...
}

monitor 线程的工作就是每 5 秒检测一次服务是否进入死循环 skynet_monitor_check , 过载会输出 "maybe in an endless loop" 日志,每秒也检测进程是否需要退出 CHECK_ABORT 。

struct skynet_monitor {
	ATOM_INT version; // 当前版本号,处理消息前自增一次
	int check_version; // 每次 check 时赋值
	uint32_t source;
	uint32_t destination;
};

version 自增, 5 秒后检测时 check_version 也会赋值为 version 一样的值。如果这个时候消息还没处理完,check_version 和 version 就会是相同的值,且处理消息的服务 destination 不为空,就会打印 "maybe in an endless loop" 。消息处理结束会把 destination 设置为 0 。也就是一个消息如果处理超过 5 秒就会输出告警。

timer 线程除了负责本身的 timer 工作,还对 signal_hup 信号的处理,和以及 worker 线程和 socket 线程的退出。

static void *
thread_timer(void *p) {
	// ...
	for (;;) {
		skynet_updatetime(); // 更新 timer 的时间
		skynet_socket_updatetime(); // netstat 使用
		CHECK_ABORT
		wakeup(m,m->count-1); // 只要有一个 worker 线程 sleep 就唤醒
		usleep(2500);
		if (SIG) {
			signal_hup(); // 给 logger 服务发消息
			SIG = 0;
		}
	}
	// wakeup socket thread
	skynet_socket_exit(); // 让 socket 线程退出,操作 socket 线程是采用管道,后续分析 socket 线程的时候再细说。
	// wakeup all worker thread
	pthread_mutex_lock(&m->mutex);
	m->quit = 1; // 设置退出标记
	pthread_cond_broadcast(&m->cond); // 唤醒所有的 worker 线程处理退出。
	pthread_mutex_unlock(&m->mutex);
	return NULL;
}

timer 线程每隔 2.5 毫秒会唤醒一条睡眠中的 worker 线程。timer 线程退出前,负责通知 socket 线程退出,和通知 worker 线程退出。

socket 线程就是循环调用 skynet_socket_poll 接口,返回 0 的时候就是 timer 线程调用了 skynet_socket_exit 接口后发生的情况。

static void *
thread_socket(void *p) {
	// ...
	for (;;) {
		int r = skynet_socket_poll();
		if (r==0)
			break; // SOCKET_EXIT 时,直接退出 socket 线程
		if (r<0) { // 网络出过错误,检查下是否 abort
			CHECK_ABORT
			continue;
		}
		wakeup(m,0); // 其他正常情况,如果所有 worker 线程都睡眠则唤醒一个
	}
	return NULL;
}

正常处理网络消息后,如果所有的 worker 线程都 sleep 了就换一个来处理看看有没有消息,防止 2.5 毫秒内,所有 worker 都睡眠的时候,收到网络消息时没有及时处理。

static void *
thread_worker(void *p) {
	// ...
	while (!m->quit) {
		q = skynet_context_message_dispatch(sm, q, weight); // 根据 weight 来决定一次处理多少条消息
		if (q == NULL) {
			if (pthread_mutex_lock(&m->mutex) == 0) { // 消息处理完了则睡眠
				++ m->sleep;
				// "spurious wakeup" is harmless,
				// because skynet_context_message_dispatch() can be call at any time.
				if (!m->quit)
					pthread_cond_wait(&m->cond, &m->mutex); // 唤醒后会从这里继续运行
				-- m->sleep;
				if (pthread_mutex_unlock(&m->mutex)) {
					fprintf(stderr, "unlock mutex error");
					exit(1);
				}
			}
		}
	}
	return NULL;
}

7. skynet_module.c 分析

skynet_module 的数据结构就比较简单了,采用数组存放。

struct modules {
	int count; // 模块数量
	struct spinlock lock;
	const char * path; // 模块路径,类似于 lua_cpath,用 ? 做匹配
	struct skynet_module m[MAX_MODULE_TYPE]; // 存放所有的模块,最大 32 个
};

static struct modules * M = NULL;

一个模块其实就是动态库,但是需要提供下面这些以模块名为前缀的接口:

关于 signal 的使用在 wiki 中有提到:

signal address sig 向服务发送一个信号,sig 默认为 0 。当一个服务陷入死循环时,默认信号会打断正在执行的 lua 字节码,并抛出 error 显示调用栈。这是针对 endless loop 的 log 的有效调试方法。注:这里的信号并非系统信号。

从代码上看 create 和 release 接口是支持不实现的,但官方的出的 C 服务都实现了这两个接口。

使用全局变量 M 来管理所有的 C 模块,模块和服务的关系就像类和对象的关系。创建服务的流程是先通过 skynet_module_query 查找到模块,然后 skynet_module_instance_create 创建服务的实体。

struct skynet_context * 
skynet_context_new(const char * name, const char *param) {
	// ...
	void *inst = skynet_module_instance_create(mod);
	// ...
	ctx->instance = inst;
	// ...
}

模块的查找就是普通的遍历查找了,因为 C 服务的上限定的是 32 ,一般只用到 snlua 这个 C 服务,也就越简单越好了。

struct skynet_module * 
skynet_module_query(const char * name) {
	struct skynet_module * result = _query(name);
	if (result)
		return result;

	SPIN_LOCK(M)

	result = _query(name); // double check
	// ...
}

查询使用了 double check ,锁住之前查询一次,如果模块已经加载过了直接返回,相当于大部分时间都是不会进入到锁下面的逻辑的。当 2 个线程同时在查询一个没加载过的模块时,一个线程加载完之后,另一个线程再进入的时候重复查一次就能拿到了。

加载动态库使用的是 dlopen ,提取函数使用 dlsym 。

因为没有对加载的模块卸载的需求,因此也没用上 dlclose 。因为 skynet 的定位是尽量采用 lua 来实现服务,也就只会用到 snlua 这一个 c 服务了,这是 lua 服务的基础,是不能卸载的。

8. skynet_server.c 分析

思考问题: ctx->handle = skynet_handle_register(ctx); 赋值成功前,如果此时 abort 调用 skynet_handle_retireall() ,是不是就没有调用到这个 ctx 的 skynet_handle_retireall() ,会不会造成进程无法退出?

已有结论 cloudwu/skynet#1576

skynet_server.c 可以说是 skynet 服务的核心了。可以把它分两部分来分析,一部分是 skynet_context 的创建销毁和收发消息。另一部分是 skynet_command 接口用来操作 skynet_context 。

struct skynet_node {
    ATOM_INT total; // 服务的数量
    int init; // 初始化标记
    uint32_t monitor_exit; // 监听服务退出的服务id
    pthread_key_t handle_key; // 用于存储当前worker线程处理的服务id
    bool profile;   // 是否统计服务消耗的cpu和time
};

static struct skynet_node G_NODE;

没继续读了,留坑。。。

9. skynet_mq.c 分析

没继续读了,留坑。。。

10. skynet_socket.c 分析

没继续读了,留坑。。。

11. 内置服务分析

C服务只要看 snlua 就够了,其他的 gate,harbor,logger 这三个都可以不看了,gate有lua版本,logger也有lua版本,harbor 算废弃的了。

12. 用到 hashmap 的地方

没继续读了,留坑。。。

点击进入评论 ...