type
status
date
slug
summary
tags
category
icon
password
异步:不用等待一个任务执行完成就可以开始执行下一个任务。与多线程不同的是,它并不需要创建额外的线程,此外,所有函数调用都是静态分发的,也没有堆分配。
异步操作意味着当你执行某个操作时,程序不会等待该操作完成,而是继续执行其他操作
为什么不直接就用线程来实现”同时“执行多个任务呢?
- 线程开销大
- 创建一个线程需要使用很多系统调用,开销很大
- 操作系统反应可能不及时
以web Server为例
从TcpStream里面读取请求内容:一旦连接建立成功,就可以使用
TcpStream
对象来发送和接收数据- handle_connection:读取发送过来的请求,并响应“hello world”
- concat!:是一个宏,可以把里面的字符串字面量连接为一个单独的字符串
问题:一次只能响应一个请求
引入线程:一个线程在等待请求时,其余线程可以处理这个请求并发送响应
阻塞IO的问题:
因为I/O阻塞导致我们丧失了程序的控制权,除了等它执行完毕,没有更好的方式取消一个线程的执行——因为应用程序的控制器完全交给了内核,导致实现事件驱动逻辑变得非常困难
非阻塞IO
WouldBlock
将监听器设置为非阻塞模式,如果I/O请求不能立即完成,内核将返回一个 WouldBlock 错误代码。尽管被表示为错误代码,但WouldBlock并不是真正的错误。只是意味着当前操作无法立即执行完毕,我们可以决定接下来要做什么。就不会像之前一样,让监听器必须等待到一个新的连接才能进行下一步了。
自旋:无休止地接受连接但是什么也不干
所以现在必须为每个状态(Read、Write、Flush)都设置响应的操作,模仿内核调度进程
比如说读操作:
- 继续读,读的时候查看当前这个读操作的状态,如果是WouldBlock,说明当前connection现在没法继续读了,就要从剩下的连接中找到状态为Read的连接,看看它能不能继续读。总的来说就是一直遍历所有连接,找到那个可以继续往下读的,让它先把读操作进行完毕
- 同时进行差错处理:读到0字节,手动将这个连接放入一个表示已完成连接的向量,并按照先进先出的顺序将其从连接向量中移除
- 写操作同理
- 刷新完毕后连接完成
缺点:
- 只用了一个线程来处理所有的连接请求,对CPU的利用率低
- 检查是否可读写使用了系统调用,开销大
多路复用
epoll的概念:
监视多个文件描述符,看看是否有任何一个可以进行I/O操作。可以往epoll里添加资源的文件描述符和事件,表示对这个文件的这个事件感兴趣,要把它加入interest list,那么epoll就可以帮忙监视这个文件事件是否发生了。
文件:做了rCore实验,其实Linux上一切存在电脑上的东西都可以被称作文件,包括TCP socket和外部设备
调用epoll_wait()将阻塞,直到以下任一情况发生:
- 文件描述符传递了一个事件;
- 调用被信号处理器中断;
- 超时到期。
所以epoll也是一种阻塞,之前的阻塞是通过自旋来完成的,自旋就是不断查询连接是否可以继续进行相关操作,连接错误类型为WouldBlock时继续查询下一个连接状态;连接状态为Ok时执行操作。
epoll这种同时阻塞多个操作的方法被称为I/O多路复用。
- epoll::wait
接受events作为参数,初始化时,events是空的事件数据,而epoll::wait将监视注册在epoll里的文件对应的事件,如果有事件变得就绪,就会将这个事件装进events数组,返回就绪事件个数
- 首先将listener接收到一个新连接作为一个事件注册进epoll,listener一收到连接,epoll就会有知道并在loop里调用
epoll::wait
时将其加入events
- 然后listener接到连接,将这个连接可读或可写也作为一个事件注册进epoll,这样一旦有一个连接可以继续被进行读写操作,就可以将它拿出来执行读写
Events::EPOLLIN | Events::EPOLLOUT
:EPOLLIN
表示对应文件描述符上有数据可读事件,EPOLLOUT
表示对应文件描述符上有数据可写事件。- 这样做的目的是告诉 epoll 监听器,我们希望监听的是文件描述符上的可读和可写事件。
- 之前的操作是一直遍历connections数组,直到找到一个可读写的连接;但现在epoll自己就会帮忙找出所有可执行的连接
- 现在实现了在同一个线程中处理多个连接请求
Futures
概念
Rust的Future与js的Promise有很多相似之处,但是Promise一旦被创建,就会马上开始run那个task,而Future却是惰性的,也就是说使用它之前必须调用它一次
- 可以看出使用Promise,避免了setTimeout的嵌套使用,而且语义也更清晰
- 当第一个 Promise resolve 后,执行
then(() => timer(100))
,返回一个新的 Promise,在 100 毫秒后 resolve。
- 当第二个 Promise resolve 后,执行
then(() => timer(50))
,返回一个新的 Promise,在 50 毫秒后 resolve。
- 当第三个 Promise resolve 后,执行
then(() => console.log("I'm the last one"))
,这时会输出 "I'm the last one"。
什么是future?
用来代表某个在未来将会被完成的任务
Rust中的异步实现基于轮询,每个异步任务分成三个阶段:
- 轮询阶段(The Poll phase)。一个
Future
被轮询到(polled)后,会开始执行,直到被阻塞。我们经常把轮询一个Future这部分称之为执行器(executor)
- 等待阶段。事件源(通常称为reactor)注册Future等待一个事件发生,并确保当该事件准备好时唤醒相应的
Future
- 唤醒阶段。事件发生了且Future被唤醒了。现在轮询Future的那个执行器(executor)应该调度Future再次被轮询,直到它结束或者到达下一个断点
异步编程
将任务写成独立的单元,能集中在一个地方处理所有任务的调度和事件处理,从而重新获得流程控制权
- 任务:处理请求、读写数据本质上都是一个任务,是一段要被执行的代码,代表着它将在未来某个时候需要得到解析——Future的意思就是这个
- fn poll:询问这个任务是否就绪,如果就绪就改变它的状态并执行它
唤醒器
使future运转起来:
- 首先给每个future分配一个id,将(id,future)这个键值对加入任务表
- 然后调度器要让哪个future运转的话,只用根据id就可以查找到任务表里对应的future,poll它就可以使其运转起来
如果 future 是 I/O 事件,在 接到 epoll 通知时我们就知道它可以执行了。问题是我们不知道 epoll 事件对应的是哪个future, 因为 future 的执行过程都在内部的 poll 中。
我们需要 future 传递一个 ID 给调度器,它可以用这个 ID 而不是文件描述符向 epoll 注册任何 I/O 资源。通过这种方式,调度器就能把 epoll 事件和 future 对应起来了。
意思就是future是一个有待执行的任务,现在用epoll来监听事件,它当然能知道哪个事件发生了,但是它不知道此时应该使哪个future运转,因此引入ID
现在的情况是:调度器可以根据event的id,使用
future.poll(event.id)
唤醒一个future,但我们想要不靠这个id,靠一个通用的方法就可以唤醒futureWaker:每个future都具有一个Waker,可以唤醒自己并通知调度器它可以被执行
- 调度器可以为每个 future 提供一个回调函数,它被调用时更新该 future 在调度器中的状态,标记 future 为就绪。这样调度器就完全和 epoll 或 其他任何独立通知系统解耦了。
这样一来,future 可以自行决定何时通知调度器它已经准备好执行,而调度器则可以专注于轮询事件并执行相应的 future
- 现在要唤醒一个任务,只用为任务实现future特征,然后传入它的waker即可
poll(waker)
wake函数:
- Waker实例的那个闭包,执行它就可以做到唤醒一个Future
- 这样一来每个Future都可以通过在poll时传入waker,自定义自己独有的唤醒方式
反应器
future被唤醒了就需要开始执行,但是现在唯一能执行的途径就是通过epoll返回的那个事件集合找到对应的任务
反应器:
负责注册事件、添加任务、驱动epoll、调用waker的wake函数唤醒任务
添加任务和删除任务:
- 添加任务:在epoll里注册任务和它的事件
- 删除任务:将任务从任务队列移除
- wait任务:找出可执行的任务,调用其waker的wake函数将它唤醒
调度器
- spawn:往可执行任务里面添加一个任务
- run:执行任务(执行那个闭包)
- 创建一个唤醒器,也就是一个闭包:作用是将一个任务推到runnable队列
- 将任务poll一次
- 调度器一直轮询每一个目前在runnable里面的任务,并且在目前没有要执行的任务时一直等待新任务
调度器和反应器共同构成了一个 future 的运行时。调度器(runnable tasks)会跟踪哪些任务是可运行的,并轮询它们,当 epoll 告诉我们它们感兴趣的内容准备就绪时,反应器会将任务标记为可运行。
总的代码:也是异步web服务的运行时
- thread_local!
- 用于定义线程局部变量
- 线程局部变量在每个线程中都有独立的实例,彼此之间不会互相干扰,也就是说这个东西不需要在线程之间共享,不会被多个线程同时访问
Leaf futures
Runtime crate里面的leaf future可以表示像嵌套字一样的资源
stream是一个叶子future,在这种情况下,通过返回一个 future,程序可以在等待套接字读取完成时执行其他操作,而不必阻塞等待数据的到达。
Non-leaf-futures
使用 async 关键字编写的 future,它们代表了一个可以在执行器上运行的任务。在异步程序中,大部分的工作都会由这些非叶子 future 完成。通常,这样的任务会等待叶子 future(如上文所述的套接字读取操作)作为其中的一个操作,以完成任务。
非叶子 future 指的是那些代表了一组操作的 future,而不是单一操作的 future
比如说这个代码,non_leaf 是一个非叶子 future,因为它包含了多个异步操作,而不是单一的异步操作。
- 第一个异步操作是stream那一步,它是一个异步连接操作,它尝试连接到指定的 IP 地址和端口。连接完成后它会返回一个socket
- 第二个异步操作是result那一步,它尝试将数据发送到套接字中。与连接操作类似,程序将在此处暂停执行当前任务,并等待写操作完成。写操作完成后,它会返回一个结果(可能是写入的字节数)
- 第二个异步操作必须等待第一个执行完成
Runtimes 运行时
运行时(runtime)是指一种在程序执行过程中提供支持的软件环境。
在异步编程中,运行时负责管理异步任务的执行和调度。它提供了异步任务的执行环境,包括任务队列、线程池、异步任务的调度器等。
A useful mental model of an async runtime
rust的异步系统可以被分为三部分:
- Reactor 反应器
它负责监听事件并将其分发给相应的future。在异步编程中,事件可以是例如套接字准备好读取或写入数据,定时器触发等。反应器使用事件驱动的方式来管理和调度异步操作
- Executor 执行器
执行器是负责实际执行异步任务(Future)的组件。它从反应器接收分发的任务,并在合适的时机执行这些任务
- Future
代表了一个可能会在未来某个时间点完成的计算任务。未来允许我们以非阻塞的方式处理异步操作,通过异步操作的结果来触发后续的操作。
在异步编程中,Future 代表了一个异步操作的结果。每当异步操作需要进行时,它会返回一个 Future 对象,该对象表示异步操作的状态和结果。Future 可以被轮询(poll)以检查操作是否已完成,如果操作尚未完成,Future 将返回一个 Poll::Pending 来指示调用者暂时挂起。
这三个部分work together:
通过一个 the
Waker
,反应器可以告诉执行器,现在有一个Future准备好开始运行了。life cycle:
- Waker是被执行器创建的
- 当future第一次被executor轮询的时候,它会被分配一个Waker对象(由这个executor创建的)的clone。Waker是一个共享的object,因此实际上所有的克隆都是指向同一个底层的原始的对象。因此,任何一个对Waker和它的clone的调用都可以唤醒一个与之绑定的Future
- Waker 是一种异步编程的机制,用于唤醒处于挂起状态的 Future,以便它们可以继续执行。Waker 包含一个指向 Future 的引用,当操作完成并且数据可用时,Future 将调用与其关联的 Waker 来通知调度器(executor)它可以继续执行。
- 上面讲到的future会将分配到的Waker对象交给反应器,后面会用到
I/O密集型 VS CPU密集型任务
两个yield之间的代码和executor是在同一个线程上运行的,也就是说当analyzer在分析数据的时候,executor也在计算数据,而不能处理新的请求
解决方法:
- 我们可以创建一个新的
leaf future
,它将我们的任务发送到另一个线程,并在任务完成时解析。 我们可以像等待其他Future一样等待这个leaf-future
。
- 运行时可以有某种类型的管理程序来监视不同的任务占用多少时间,并将 executor 移动到不同的线程,这样即使我们的分析程序任务阻塞了原始的执行程序线程,它也可以继续运行。
- 您可以自己创建一个与运行时兼容的
reactor
,以您认为合适的任何方式进行分析,并返回一个可以等待的future。
异步的web服务
为主任务实现Future特征:
- 编写主任务:主任务掌握整个可执行任务队列的开关(waker)
- 主任务处于起始状态时:调用REACTOR的add函数,在epoll中注册“监听器监听到有新连接”事件
- 主任务处于Accept状态时:尝试获取listener,要是listener就绪,就尝试捕获一个连接
- 使用调度器来创建一个任务,这个任务就是处理这个新连接
反应器:监听事件,调度器:任务调度
任务:处理连接
逻辑跟之前一样,根据连接状态来判断任务可执行的情况下接下来应该干嘛;并且使用epoll来监听任务何时可执行——现在epoll已经被反应器封装好了,在epoll注册事件变为对反应器调用add
全功能的Web服务
poll_fn
poll_fn
的实现:- 定义
poll_fn
函数:接收一个闭包f
,该闭包接受一个Waker
并返回Option<T>
。
- 在内部定义了结构体PollFn,并为它实现了Future特征。这个结构体里面有一个闭包,当调用结构体的poll函数时,会将waker作为结构体里面闭包的参数
Closure Call
In Rust, if you have a variable that holds a closure, you can call it just like a function by using parentheses
()
and passing the required arguments.用法:
chain
Chain future允许将两个future链接在一起,使用提供的闭包从第一个future过渡到第二个future。
为Future特征定义chain函数:
- chain函数接受一个闭包transition,该闭包的操作是将一个Future的输出转换为另一个Future,chain函数返回值是一个chain枚举
- 函数创建一个Chain::First变量,将调用函数时传入的闭包transition作为这个枚举变量的transition,并将Chain::First作为函数返回值
- 当使用chain函数获得一个Chain类型的变量,调用它的poll函数时,就会产生一个迷你自动机:future1被poll之后,状态转移,Chain枚举变量由Chain::First变为Chain::Second,然后future2被poll
之前需要调用两次poll_fn来处理两个future,而这两个future之间又有联系,即listener必须先注册了才能开始接收连接。之前的处理方法是:定义一个枚举Main,表示主任务的状态,当主任务的状态为Start的时候,添加一个TCP监听器并将状态改为Accept…这样相当于是手动把自动机的状态进行转移。现在有了Chain,就可以通过poll一个Chain枚举类型,将状态进行自动转移了
现在有了Chain Future就可以把两个future连在一起了:
将handle任务也这样用chain来组合:遇到报错,因为chain获取了connection的所有权
其实只需要将connection放在堆上就可以了:这样一来,就可以确保每个chain函数都可以安全引用connection
因为在创建future的时候要使用data的引用,现在data的生命周期已经明了,但是它的位置会变化,因此使用堆上的智能指针,让它的位置不变
Scheduler和Reactor是怎么连在一起的?
优雅的Web服务
优雅退出:当按下
ctrl+c
时,不是粗暴地立刻关闭程序,而是应该立刻停止接受新的连接请求,同时等待已建立连接的请求执行完毕,超过30秒没执行完毕的请求将会被终止,然后服务端程序才退出使用signal来接受ctrl C信号, forever().next() 会阻塞线程直到收到信号。但现在我们的程序只有一个线程,阻塞线程意味着所有的其他任务都会被阻塞,所以这个接收信号的动作也应该是个future
在epoll中注册这个信号,并在接收到信号时通知主线程
spawn_blocking
让阻塞任务单独在一个线程上执行:
- 要执行阻塞任务,首先必须让这个会使整个线程阻塞的任务在一个单独的线程上执行、
- 并且要为这个阻塞任务设置状态,以便主线程知道阻塞任务完成了
- 如果状态中存储了一个唤醒器,它将被获取并调用以唤醒Future
- 用poll_fn创建一个Future,如果阻塞任务还没有完成的话,就在状态中存入一个waker
- 返回一个future,如果任务完成了,就返回一个Some(()),表示future准备好了;如果任务没完成就返回一个None,表示future没准备好
spawn_blocking 服务将该 future 作为异步版本的 JoinHandle 返回。阻塞任务在单独的线程中运行时,主线程可以异步地等待它完成
spawn_blocking
返回一个Future
,这个Future
可以在异步上下文中被await
。
- 这个
Future
的poll
方法会检查阻塞任务是否已经完成,如果完成则返回Some(())
,否则返回None
并存储Waker
spawn_blocking
是一种方便的抽象,允许我们在单独的线程中运行阻塞任务,同时返回一个Future
,这个Future
可以在主线程中被异步等待。
select
spawn_blocking 是一个非常方便的抽象,常用于处理异步程序中的阻塞 API。好的,我们有能等待 ctrl+c 信号的 future 了。如果您还记得我们的服务端使用阻塞I/O时的情景,当时我们想知道如何监视信号,才能在信号到达后立即中止连接侦听器循环。当时我们就意识到需要以某种方式来同时监听传入连接和ctrl+c信号。因为 accept 是阻塞的,这并不容易,但是有了 future 就让这变得可能了!我们将这包装成另一个 future。给定两个 future,我们能创建一个包装器实现选择功能:哪个 future 先完成,就将它的输出作为包装器的输出。
这个意思就是:
- 通过将 accept 和信号监听封装成 Future,我们可以在异步上下文中处理这两种操作,即同时监听两个信号
- 我们可以创建一个包装器
Future
,这个包装器实现了选择功能:监听多个Future
,并且哪个Future
先完成,就将它的输出作为包装器的输出。
因为我们将同一个唤醒器传递给两个 future ,任何一个 future 的进展都会通知我们,我们可以检查其中一个是否完成。
因为这里将同一个waker交给了reactor,所以有事件发生的时候,reactor就会调用wake函数
但是listen好像永远不会完成,listener将会一直监听是否有新连接。所以设计:一旦收到关闭的信号,等待当前处理中的请求请求执行完毕,30秒后关闭,要么等待30秒,要么所有处理中的请求执行完毕——这个也可以用select
select(timer, request_counter)
:两个Future- 计时器timer,计时30s就关闭程序
- let timer = spawn_blocking(|| thread::sleep(Duration::from_secs(30)));
- 新建一个线程来计时
- 计数器request_counter:
- 为了了解请求全部执行完毕的准确时间,我们需要统计执行中的请求数量
最终的结果:
Select是怎么被poll的?
Select是一个实现了Future特征的结构体,当它调用chain函数的时候就会被poll。调用chain的时候,调用它的Future1和转换过来的Future2都会被poll,调用chain的那个Future永远都是Future1
标准库
Future Trait
Waker
Pinging
Pin:
- 将
Future
在内存中固定到一个位置,防止某些Future位置移动导致对里面数据的引用会被破坏
- 它包裹一个指针,并且能确保该指针指向的数据不会被移动,例如
Pin<&mut T>
,Pin<&T>
,Pin<Box<T>>
,都能确保T
不会被移动。
Unpin:
- 它是一个特征,拥有Unpin特征的类型可以被安全移动
async/.await
async/.await
是 Rust 内置的语言特性,可以让我们用同步的方式去编写异步的代码。通过
async
标记的语法块会被转换成实现了Future
特征的状态机。 与同步调用阻塞当前线程不同,当Future
执行并遇到阻塞时,它会让出当前线程的控制权,这样其它的Future
就可以在该线程中运行,这种方式完全不会导致当前线程的阻塞。async
用于创建一个Future,不使用 poll_fn 来创建 future,而是添加 async 关键字到 fn 的前面
创建一个异步函数,异步函数的返回值是一个
Future
,若直接调用该函数,不会输出任何结果,因为 Future
还未被执行使用执行器来使用Future:
.await
在
async fn
函数中使用.await
可以等待另一个异步调用的完成。但是与block_on
不同,.await
并不会阻塞当前的线程,而是异步的等待Future A
的完成,在等待的过程中,该线程还可以继续执行其它的Future B
,最终实现了并发处理的效果。await 在等待另一个 future 完成时返回 Poll:Pending,直到 future 完成
- Author:orangec
- URL:orange’s blog | welcome to my blog (clovy.top)/0110761b7fe44daf831a109637cedbb5
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!
Relate Posts