首页 Python的IO多路复用
文章
取消

Python的IO多路复用

在Python的协程事件循环中,有一条很重要的语句 event_list = self._selector.select(timeout),用来在指定超时时间内找出就绪的事件,这就用到了 Python 的IO多路复用机制 selector。

selector 在不同的平台中有不同的实现,依赖于所处操作系统上的IO多路复用机制,如 selectpollepollkqueuedevpoll。Python 优先使用 epollkqueuedevpoll,其次 poll,再其次 select

节选自 asyncio/selector.py(Python3.7,下同)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Choose the best implementation, roughly:
#    epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector

关于这些IO多路复用机制支持的操作平台区别如下:

  • select,支持类 Unix系统(Linux、MacOS)、Windows(只支持socket)
  • poll,支持 Linux
  • epoll,支持 Linux
  • kqueue,支持 MacOS
  • devpoll,支持 Linux

不同 selector 的实现关系如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
                              BaseSelector
                                    |
                                    |
                            _BaseSelectorImpl
                                    |
                                    |
|-----------------------------------|---------------------------|
|                                   |                           |
SelectSelector               _PollLikeSelector            KqueueSelector  
                                    |
                                    |
                  |-----------------|--------------|
            PollSelector      EpollSelector   DevpollSelector

BaseSelector 是一个抽象类,定义抽象方法。

_BaseSelectorImpl 是对 BaseSelector 的基础实现,主要是管理文件描述符的注册、移除、修改,以便底层IO多路复用模块进行监视。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class _BaseSelectorImpl(BaseSelector):
    """Base selector implementation."""

    def __init__(self):
        # this maps file descriptors to keys
        self._fd_to_key = {}
        # read-only mapping returned by get_map()
        self._map = _SelectorMapping(self)

    # 将文件(抽象文件,下同)对象转化为一个文件描述符
    def _fileobj_lookup(self, fileobj):
        """Return a file descriptor from a file object.

        This wraps _fileobj_to_fd() to do an exhaustive search in case
        the object is invalid but we still have it in our map.  This
        is used by unregister() so we can unregister an object that
        was previously registered even if it is closed.  It is also
        used by _SelectorMapping.
        """
        try:
            return _fileobj_to_fd(fileobj)
        except ValueError:
            # Do an exhaustive search.
            for key in self._fd_to_key.values():
                if key.fileobj is fileobj:
                    return key.fd
            # Raise ValueError after all.
            raise

    def register(self, fileobj, events, data=None):
        # 将文件、要监视的事件、数据注册进监视集合
        if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
            raise ValueError("Invalid events: {!r}".format(events))

        key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)

        if key.fd in self._fd_to_key:
            raise KeyError("{!r} (FD {}) is already registered"
                           .format(fileobj, key.fd))

        self._fd_to_key[key.fd] = key
        return key

    def unregister(self, fileobj):
        # 将文件从监视集合中移除
        try:
            key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
        except KeyError:
            raise KeyError("{!r} is not registered".format(fileobj)) from None
        return key

    def modify(self, fileobj, events, data=None):
        # 修改文件的监听事件和数据
        try:
            key = self._fd_to_key[self._fileobj_lookup(fileobj)]
        except KeyError:
            # 没有注册过
            raise KeyError("{!r} is not registered".format(fileobj)) from None
        if events != key.events:
            # 要监听的事件有变化,重新注册
            self.unregister(fileobj)
            key = self.register(fileobj, events, data)
        elif data != key.data:
            # 数据有变化,更新
            # Use a shortcut to update the data.
            key = key._replace(data=data)
            self._fd_to_key[key.fd] = key
        return key

    def close(self):
        # 关闭,清空数据
        self._fd_to_key.clear()
        self._map = None

    def get_map(self):
        return self._map

    def _key_from_fd(self, fd):
        # 从一个文件描述符获得对应的监视集合的key
        """Return the key associated to a given file descriptor.

        Parameters:
        fd -- file descriptor

        Returns:
        corresponding key, or None if not found
        """
        try:
            return self._fd_to_key[fd]
        except KeyError:
            return None

关于接下来具体的 selector 定义,我们拿 EpollSelector 来研究下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 如果当前 select 模块有 epoll 属性,即当前操作系统支持 epoll
if hasattr(select, 'epoll'):

    class EpollSelector(_PollLikeSelector):
        """Epoll-based selector."""
        _selector_cls = select.epoll
        _EVENT_READ = select.EPOLLIN
        _EVENT_WRITE = select.EPOLLOUT

        # 获取监听器的文件描述符,在 epoll 中是一个文件描述符管理多个文件描述符,这个返回的是前者
        def fileno(self):
            return self._selector.fileno()

        # 选择出就绪的事件
        def select(self, timeout=None):
            if timeout is None:
                timeout = -1
            elif timeout <= 0:
                timeout = 0
            else:
                # epoll_wait() has a resolution of 1 millisecond, round away
                # from zero to wait *at least* timeout seconds.
                timeout = math.ceil(timeout * 1e3) * 1e-3

            # epoll_wait() expects `maxevents` to be greater than zero;
            # we want to make sure that `select()` can be called when no
            # FD is registered.
            max_ev = max(len(self._fd_to_key), 1)

            ready = []
            try:
                # 获取就绪的文件描述符
                fd_event_list = self._selector.poll(timeout, max_ev)
            except InterruptedError:
                # 被中断的话直接返回
                return ready
            for fd, event in fd_event_list:
                events = 0
                if event & ~select.EPOLLIN:
                    # fd此时可写
                    events |= EVENT_WRITE
                if event & ~select.EPOLLOUT:
                    # fd此时可读
                    events |= EVENT_READ

                # 从文件描述符获取 key。_key_from_fd 其实是个dict,{fd:key} 形式
                key = self._key_from_fd(fd)
                if key:
                    # key 既然存在,那么就添加进就绪列表
                    ready.append((key, events & key.events))
            # 返回就绪的文件描述符(以元组形式)
            return ready

        def close(self):
            self._selector.close()
            super().close()

上面的 key 其实是这样的一个对象,其内部包含文件对象、文件描述符、时间和数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])

SelectorKey.__doc__ = """SelectorKey(fileobj, fd, events, data)

    Object used to associate a file object to its backing
    file descriptor, selected event mask, and attached data.
"""
if sys.version_info >= (3, 5):
    SelectorKey.fileobj.__doc__ = 'File object registered.'
    SelectorKey.fd.__doc__ = 'Underlying file descriptor.'
    SelectorKey.events.__doc__ = 'Events that must be waited for on this file object.'
    SelectorKey.data.__doc__ = ('''Optional opaque data associated to this file object.
    For example, this could be used to store a per-client session ID.''')

上面代码中有行关键的代码:fd_event_list = self._selector.poll(timeout, max_ev),其中 _selectorselect.epoll 的实例化对象,是对操作系统 epoll 的调用封装。

简而言之,selector 是对各系统平台最佳底层 IO 多路复用模块的封装,将IO事件封装和管理,然后调用底层函数获取出就绪的文件描述符、事件和数据。

我们用socket服务做个实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import selectors

import socket

sel = selectors.DefaultSelector()

print("default selector on my system:", sel)


def accept(sck):
    conn, addr = sck.accept()
    print(f"get connection from {addr}")
    conn.setblocking(False)
    # 注册socket的可读事件,当socket可读的时候,回调 read_data 方法
    sel.register(conn, selectors.EVENT_READ, read_data)


def read_data(conn):
    # 读取数据
    data = conn.recv(1024)
    print(f"data received:{data}")
    if data:
        content = data.decode("utf8")
        response = f"I get {content},thank you".encode()
        conn.send(response)
    else:
        conn.send(f"I did not get data from you")
    conn.close()


if __name__ == '__main__':
    sock = socket.socket()
    sock.bind(("127.0.0.1", 8000))
    sock.listen()
    sock.setblocking(False)
    # 注册可读事件
    sel.register(sock, selectors.EVENT_READ, accept)
    while True:
        # 阻塞地等待就绪的socket事件
        events = sel.select()

        for key, mask in events:
            callback = key.data  # 调用 read_data 函数
            callback(key.fileobj)

我们用一个客户端连接此 server 然后发送数据 hi,welcome to https://uphie.studio,得到打印结果:

1
2
3
default selector on my system: <selectors.KqueueSelector object at 0x7fa24dc33978>
get connection from ('127.0.0.1', 60974)
data received:b'hi,welcome to https://uphie.studio'

由于我电脑使用的是 Mac,所以 selector 是 KqueueSelector 实现。

参考链接:https://github.com/python/cpython/blob/3.7/Lib/selectors.py

本文由作者按照 CC BY 4.0 进行授权