注册 登录  
 加关注
查看详情
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

FY

Johnson 's Blog

 
 
 

日志

 
 

memcached Master-Worker 模型分析(3)  

2011-12-13 15:23:21|  分类: memcached |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
链接出处


先看主线程,在thread_init返回(所有线程初始化完成)之后,main函数做了一些其他的初始化之后就调用了 event_base_loop(main_base, 0);这个函数开始处理网络事件,接受连接了。在此之前,main函数在绑定监听端口的时候就已经把监听socket的事件加到了main_base中了 (参看server_socket函数,不多说)。监听事件的回调函数是memcached中所有网络事件公用的回调函数event_handler,而 这个event_handler也是基本什么都不干,直接又调用drive_machine,这个函数是由一个大大是switch组成的大状态机。这里就 是memcached所有网络事件的处理中枢,我们来看看:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

staticvoid

drive_machine(conn *c) {

    boolstop
=
false;

    intsfd,
flags = 1;

    socklen_t
addrlen;

    structsockaddr_storage
addr;

    intnreqs
= settings.reqs_per_event;

    intres;

 

    while(!stop)
{

 

        switch(c->state)
{

        //这个监听状态只有主线程的监听fd才会有,而主线程也就基本就这么一个状态

        caseconn_listening:

            //到这,说明有新连接来了

            //accept新连接

            addrlen
=
sizeof(addr);

            if((sfd
= accept(c->sfd, (
structsockaddr
*)&addr, &addrlen)) == -1) {

                if(errno==
EAGAIN ||
errno==
EWOULDBLOCK) {

                    /*
these are transient, so don't log anything */

                    stop
=
true;

                }elseif

(
errno==
EMFILE) {

                    if(settings.verbose
> 0)

                        fprintf(stderr,"Too
many open connections\n"
);

                    accept_new_conns(false);

                    stop
=
true;

                }else{

                    perror("accept()");

                    stop
=
true;

                }

                break;

            }

 

            //设置套接字非阻塞

            if((flags
= fcntl(sfd, F_GETFL, 0)) < 0 ||

                fcntl(sfd,
F_SETFL, flags | O_NONBLOCK) < 0) {

                perror("setting
O_NONBLOCK"
);

                close(sfd);

                break;

            }

 

            //将新连接分给worker线程

            dispatch_conn_new(sfd,
conn_new_cmd, EV_READ | EV_PERSIST,

                                     DATA_BUFFER_SIZE,
tcp_transport);

            stop
=
true;

            break;

 

            //下面是worker线程的一些事件,此处略

        caseconn_waiting:

            //...

 

        caseconn_read:

            //...

    }

 

    return;

}

接着看dispatch_conn_new这个函数:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

voiddispatch_conn_new(intsfd,
enumconn_states
init_state,
intevent_flags,

                       intread_buffer_size,
enumnetwork_transport
transport) {

    //分配一个连接队列item,此item将会由主线程塞到worker线程的连接队列中

    CQ_ITEM
*item = cqi_new();

 

    //RR轮询得到这个连接的目标线程

    inttid
= (last_thread + 1) % settings.num_threads;

    LIBEVENT_THREAD
*
thread=
threads + tid;

    last_thread
= tid;

 

    //初始化item

    item->sfd
= sfd;

    item->init_state
= init_state;

    item->event_flags
= event_flags;

    item->read_buffer_size
= read_buffer_size;

    item->transport
= transport;

 

    //将item塞到worker线程的队列中

    cq_push(thread->new_conn_queue,
item);

 

    MEMCACHED_CONN_DISPATCH(sfd,thread->thread_id);

    //向worker线程的通知写fd中写一个字节,如此notify_receive_fd就会有一个字节可读

    //这样worker线程的notify_event就会收到一个可读的事件

    //memcached就是这样来达到线程间异步通知的目的,很tricky

    if(write(thread->notify_send_fd,"",
1) != 1) {

        perror("Writing
to thread notify pipe"
);

    }

}

好了,自此主线程处理连接的逻辑基本就没了,下面看看worker线程的相关代码。

  评论这张
 
阅读(225)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2018