5

开源的 c 语言网络协程库 state_thread 源码分析

 3 years ago
source link: https://my.oschina.net/u/1248746/blog/5025399
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
开源的 c 语言网络协程库 state_thread 源码分析

state thread是一个开源的c语言网络协程库,它在用户空间实现了协程调度
st最初是由网景(Netscape)公司的MSPR(Netscape Portable Runtime library)项目中剥离出来,后由SGI(Silicon Graphic Inc)和Yahoo!公司(前者是主力)共同开发维护。
2001年发布v1.0以来一直到2009年v1.9稳定版后未再变动

State Threads:回调终结者(必读)
https://blog.csdn.net/caoshangpa/article/details/79565411

st-1.9.tar.gz 是原版, http://state-threads.sourceforge.net/
state-threads-1.9.1.tar.gz 是srs修改版, https://github.com/ossrs/state-threads

st源码编译
tar zxvf st-1.9.tar.gz
cd st-1.9
make linux-debug          // make命令可以查看支持的编译选项
obj目录有编译生成的文件st.h, lib*.so,lib*.a
examples目录有几个例子lookupdns,proxy,server

需要的知识点
1 汇编语言(非必需)
2 线程的栈管理(非必需)
3 线程的调度和同步(必须)。线程不同步的测试代码thread.c
4 setjmp/longjmp的使用(必须)。测试代码setjmp.c
5 epoll原理和使用(必须)。测试代码epoll_server.c 和 epoll_client.c

分析state_thread源码的目的,是为了正确的使用它
st中thread其实是协程的概念
st_xxx分为 io类 和 延迟类

一些重要的数据结构
_st_vp_t _st_this_vp;     virtual processor 虚拟处理器
_st_thread_t *_st_this_thread;
_st_clist_t  run_q, io_q, zombie_q, thread_q 
_st_thread_t  *idle_thread, *sleep_q

代码分析
st库自带的example业务逻辑较为复杂,有兴趣可以看下。
为了简化问题,编写了测试代码st-1.9/examples/st_epoll.c,依据此代码提出问题分析问题。
st_init()做了什么?
_st_idle_thread_start()做了什么?
st_thread_create()做了什么?
st_thread_exit()做了什么?
st_usleep()做了什么?
主业务逻辑(无限循环)协程是如何调度的?
监听的文件描述符是如何调度的?
协程如何正常退出?
1 没有设置终止条件变量(不可以被join)的协程直接return即可退出; 
2 设置了终止条件变量(可以被join)的协程退出时,先把自己加入到zombie_q中,然后通知等待的协程,等待的协程退出后,自己在退出。

协程的join(连接)是什么意思?

1 创建协程a的时候 st_thread_create(handle_cycle, NULL, 1, 0) 要设置为1, 表示该协程可以被join

2 协程b代码里要掉用st_thread_join(thread, retvalp),表示我要join到协程a上

3 join的意思是 协程a和协程b 有一定关联行,在协程退出时,要先退出协程b 才能退出协程a

4 st中一个协程只能被另一个协程join,不能被多个协程join

5 可以被join的协程a,在没有其他协程join时,协程a无法正常退出

st里的mutex有什么用?

通常情况下st的多协程是不需要加锁的,但是在有些情况下需要锁来保证原子操作,下面会详细说明。

st_mutex_new(void); 创建锁

st_mutex_destroy(st_mutex_t lock); 等待队列必须为空才能销毁锁

st_mutex_lock(st_mutex_t lock); 第一次掉用能获得锁,以后掉用会加入锁的等待队列中(FIFO)

st_mutex_unlock(st_mutex_t lock); 释放锁并激活等待队列的协程

st_mutex_trylock(st_mutex_t lock); 尝试获得锁不会加入到等待队列

st里的cond有什么用?

通常情况下st的多协程是不需要条件变量的,但是有些情况下需要条件变量来保证协程执行的先后顺序,比如:协程a要先于协程b执行

st_cond_new(void); 创建条件变量

st_cond_destroy(st_cond_t cvar); 等待队列必须为空才能销毁条件变量

st_cond_timedwait(st_cond_t cvar, st_utime_t timeout); 限时等待条件变量,会加入条件变量的等待队列中(FIFO),并加入到sleep_q队列中(可能先于FIFO的顺序被调度到)

st_cond_wait(st_cond_t cvar); 阻塞等待条件变量,会加入条件变量的等待队列中(FIFO)

st_cond_signal(st_cond_t cvar); 唤醒阻塞在条件变量上的一个协程

st_cond_broadcast(st_cond_t cvar); 唤醒阻塞在条件变量上的全部协程

st中与调度有关的函数

st的setjmp

#define _ST_SWITCH_CONTEXT(_thread)   \                协程切换的两个宏函数之一,停止当前协程并运行其他协程

  ST_BEGIN_MACRO                      \

  ST_SWITCH_OUT_CB(_thread);          \                       协程切走时调用的函数,一般不管用

  if (!MD_SETJMP((_thread)->context)) \                         汇编语言实现 应该跟setjmp()一样 首次掉用返回0

    _st_vp_schedule();                \                                    核心调度函数

  ST_DEBUG_ITERATE_THREADS();         \

  ST_SWITCH_IN_CB(_thread);           \                          协程切回时调用的函数,一般不管用

  ST_END_MACRO

st的longjmp

#define _ST_RESTORE_CONTEXT(_thread) \               协程切换的两个宏函数之一,恢复线程运行

  ST_BEGIN_MACRO                     \

  _ST_SET_CURRENT_THREAD(_thread);   \                设置全局变量 _st_this_thread = _thread

  MD_LONGJMP((_thread)->context, 1); \                       汇编语言实现 应该跟longjmp()一样, 返回值永远为1

  ST_END_MACRO

MD_SETJMP的时候,会使用汇编把所有寄存器的信息保留下来,而MD_LONGJMP则会把所有的寄存器信息重新加载出来。两者配合使用的时候,可以完成函数间的跳转。

st的核心调度函数

void _st_vp_schedule(void)

  _st_thread_t *thread;

  printf("in _st_vp_schedule\n");

  printf("_st_active_count = %d\n", _st_active_count);

  if (_ST_RUNQ.next != &_ST_RUNQ)

    printf("use runq\n");

    /* Pull thread off of the run queue */

    thread = _ST_THREAD_PTR(_ST_RUNQ.next);

    _ST_DEL_RUNQ(thread);

    printf("use idle\n");

    /* If there are no threads to run, switch to the idle thread */

    thread = _st_this_vp.idle_thread;

  ST_ASSERT(thread->state == _ST_ST_RUNNABLE);

  /* Resume the thread */

  thread->state = _ST_ST_RUNNING;

  _ST_RESTORE_CONTEXT(thread);

st辅助调度函数

void *_st_idle_thread_start(void *arg)

  printf("i'm in _st_idle_thread_start()\n");

  _st_thread_t *me = _ST_CURRENT_THREAD();

  while (_st_active_count > 0)

    /* Idle vp till I/O is ready or the smallest timeout expired */

    printf("call _st_epoll_dispatch()\n");

    _ST_VP_IDLE();                                                       处理io类事件

    /* Check sleep queue for expired threads */

    _st_vp_check_clock();                                               处理延时类事件

    me->state = _ST_ST_RUNNABLE;

    _ST_SWITCH_CONTEXT(me);                                  从这里恢复运行,然后判断_st_active_count的值

  /* No more threads */

  exit(0);                                                                        整个程序退出

  /* NOTREACHED */

  return NULL;

会触发协程切换的函数有哪些?

sched.c:86: _ST_SWITCH_CONTEXT(me); 59 int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)

sched.c:234: _ST_SWITCH_CONTEXT(me); 221 void *_st_idle_thread_start(void *arg)

sched.c:261: _ST_SWITCH_CONTEXT(thread); 244 void st_thread_exit(void *retval)

sched.c:276: _ST_SWITCH_CONTEXT(thread); 244 void st_thread_exit(void *retval)

sync.c:131: _ST_SWITCH_CONTEXT(me); 115 int st_usleep(st_utime_t usecs)

sync.c:198: _ST_SWITCH_CONTEXT(me); 180 int st_cond_timedwait(_st_cond_t *cvar, st_utime_t timeout)

sync.c:315: _ST_SWITCH_CONTEXT(me); 290 int st_mutex_lock(_st_mutex_t *lock)

sched.c:134: _ST_RESTORE_CONTEXT(thread); 115 void _st_vp_schedule(void)

st的优缺点

1 用户空间实现协程调度,降低了用户空间和内核空间的切换,一定程度上提高了程序效率。

2 由于是在单核上的单线程多协程,同一时间只会有一个协程在运行,所以对于全局变量也不需要做协程同步。

   共享资源释放函数只需做到可重入就行,所谓的可重入就是释放之前先判断是否为空值,释放后要赋空值。

3 协程使用完,直接return即可,st会回收协程资源并做协程切换。

4 可以通过向run_q链表头部加入协程,来实现优先调度。

5 st支持多个操作系统,比如 AIX,CYGWIN,DARWIN,FREEBSD,HPUX,IRIX,LINUX,NETBSD,OPENBSD,SOLARIS

1 所有I/O操作必须使用st提供的API,只有这样协程才能被调度器管理。

2 所有协程里不能使用sleep(),sleep()会造成整个线程sleep。

3 被调度到的协程不会被限制运行时长,如果有协程是cpu密集型或死循环,就会严重阻碍其他协程运行。

4 单进程单线程,只能使用单核,想要通过多个cpu提高并发能力,只能开多个程序(进程),多进程通信较麻烦。

补充知识点

1 线程为什么要同步?

线程由内核自动调度

同一个进程上的线程共享该进程的整个虚拟地址空间

同一个进程上的线程代码区是共享的,即不同的线程可以执行同样的函数

所以在并发环境中,多个线程同时对同一个内存地址进行写入,由于CPU寄存器时间调度上的问题,写入数据会被多次的覆盖,会造成共享数据损坏,所以就要使线程同步。

2 什么情况下需要线程同步?

线程同步指的是 不同时发生,就是线程要排队

1 多核,单进程多线程,不同线程会对全局变量读写,这种情况才需要对线程做同步控制

2 单核,单进程多线程,不同线程会对全局变量读写,这种情况不需要对线程做同步控制

3 多核,单进程多线程,不同线程不对全局变量读写,这种情况不需要对线程做同步控制

4 多核,单进程多线程,不同线程会对全局变量读,这种情况不需要对线程做同步控制

问题:对于第2条,这个应该是有点儿片面,有依赖有优先级抢占也是要同步,除非像abcde这种几个线程干一摸一样的事情,而且项目之间不依赖。
有依赖 可以理解为 生产消费关系,虽然是 单核 单进程 多线程 但是同一时间只能有一个线程在运行,也就是说 生产和消费不会同时发生,同样多个生产也不会同时发生,所以不需要锁。

线程是有优先级控制,但是不管怎么控制,只要保证同一时间只能有一个线程在运行,就不需要锁了。

问题:对于第2条,这个应该是有点儿片面,有原子操作且原子操作过程中有线程切换,这种是需要锁的。

比如,线程a 第一次读取全局变量x并做处理,然后发生线程切换(线程由内和自动调度)后切回,然后第二次读取全局变量x并做处理,我们想确保两次读取的x值相同,但是发生了线程切换x值可能被改变。

如何 确保 第一次读取并处理和第二次读取并处理是原子操作呢? 使用st_mutex_t

3 accept()序列化

亦称惊群效应,亦亦称Zeeg难题

https://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/articles/SerializingAccept.html

在多次fork自己之后,每个进程一般将会开始阻塞在 accept() 上

每当socket上尝试进行一个连接,阻塞在 accept() 上的每个进程的 accept() 都会被唤醒。

只有其中一个进程能够真正接收到这个连接,而剩余的进程将会获得一个无聊的 EAGAIN 这导致了大量的CPU周期浪费,实际解决方法是把一个锁放在 accept() 调用之前,来序列化它的使用

4 Internet Applications网络程序架构

多进程架构 Multi-Process

    一个进程服务一个连接,要解决数据共享问题

单进程多线程架构 Multi-Threaded 

    一个线程服务一个连接,要解决数据同步问题

事件驱动的状态机架构 Event-Driven State Machine 

    事件触发回调函数(缺点是嵌套) 或 用户空间实现协程调度

实际上 EDSM架构 用很复杂的方式模拟了多线程

st提供的就是EDSM机制,它在用户空间实现协程调度

https://blog.csdn.net/caoshangpa/article/details/53282330


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK