快速开始
在开始编写 actix 应用程序之前需要先安装某一版本的 Rust。 建议使用 rustup 来安装或配置版本。
安装 Rust
在开始之前,我们需要使用 rustup 安装程序来安装 Rust:
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
如果已经安装过 rustup,请运行这个命令来确保拥有最新版本的 Rust:
rustup update
actix 框架需要 Rust 1.40.0 及更高版本。
运行示例
开始试验 actix 最快的方法是克隆 actix 版本库并运行 examples/ 目录中所包含的示例。以下这组命令会运行 ping
示例:
git clone https://github.com/actix/actix
cargo run --example ping
更多示例请查看 examples/ 目录。
入门
让我们来创建并运行第一个 actix 应用程序。我们会创建一个新的依赖于 actix 的 Cargo 项目,然后运行该应用程序。
在上一节中,我们已经安装了所需的 rust 版本。现在来创建新的 cargo 项目。
Ping 参与者
我们来写第一个 actix 应用程序吧!首先创建一个新的基于二进制的 Cargo 项目并切换到新目录中:
cargo new actor-ping
cd actor-ping
现在,将 actix 添加为项目的依赖,即确保 Cargo.toml 中包含以下内容:
[dependencies]
actix = "0.10.0-alpha.3"
actix-rt = "1.1" # <-- Runtime for actix
我们来创建一个接受 Ping
消息并以 ping 处理后的数字作为响应的参与者。
参与者(actor)是实现 Actor
trait 的类型:
# extern crate actix; use actix::prelude::*; struct MyActor { count: usize, } impl Actor for MyActor { type Context = Context<Self>; } # # fn main() {}
每个参与者都有一个执行上下文,对于 MyActor
我们会使用 Context<A>
。关于参与者上下文的更多信息在下一节中介绍。
现在需要定义参与者需要接受的消息(Message
)。消息可以是实现
Message
trait 的任何类型。
# extern crate actix; use actix::prelude::*; #[derive(Message)] #[rtype(result = "usize")] struct Ping(usize); # # fn main() {}
Message
trait 的主要目的是定义结果类型。Ping
消息定义了
usize
,表示任何可以接受 Ping
消息的参与者都需要返回 usize
值。
最后,需要声明我们的参与者 MyActor
可以接受 Ping
并处理它。
为此,该参与者需要实现 Handler<Ping>
trait。
# extern crate actix; # use actix::prelude::*; # # struct MyActor { # count: usize, # } # impl Actor for MyActor { # type Context = Context<Self>; # } # # struct Ping(usize); # # impl Message for Ping { # type Result = usize; # } # impl Handler<Ping> for MyActor { type Result = usize; fn handle(&mut self, msg: Ping, _ctx: &mut Context<Self>) -> Self::Result { self.count += msg.0; self.count } } # # fn main() {}
就是这样。现在只需要启动我们的参与者并向其发送消息。
启动过程取决于参与者的上下文实现。在本例中我们可以使用基于 tokio/future 的 Context<A>
。可以用 Actor::start()
或者 Actor::create()
来启动。前者用于可以立即创建参与者实例的场景。
后者用于在创建参与者实例之前需要访问上下文对象的场景。对于 MyActor
参与者,我们可以使用 start()
。
所有与参与者的通信都通过地址进行。可以用 do_send
发送一条消息而不等待响应,也可以向一个参与者用 send
发送指定消息。
start()
与 create()
都会返回一个地址对象。
在以下示例中,我们会创建一个 MyActor
参与者并发送一条消息。
Here we use the actix-rt as way to start our System and drive our main Future
so we can easily .await
for the messages sent to the Actor.
# extern crate actix; # extern crate actix_rt; # use actix::prelude::*; # struct MyActor { # count: usize, # } # impl Actor for MyActor { # type Context = Context<Self>; # } # # struct Ping(usize); # # impl Message for Ping { # type Result = usize; # } # impl Handler<Ping> for MyActor { # type Result = usize; # # fn handle(&mut self, msg: Ping, ctx: &mut Context<Self>) -> Self::Result { # self.count += msg.0; # self.count # } # } # #[actix_rt::main] async fn main() { // 启动新的参与者 let addr = MyActor { count: 10 }.start(); // 发送消息并获取结果 future let res = addr.send(Ping(10)).await; // handle() returns tokio handle println!("RESULT: {}", res.unwrap() == 20); // stop system and exit System::current().stop(); }
#[actix_rt::main]
starts the system and block until future resolves.
Ping 示例可在示例目录中找到。
参与者
Actix 是一个 rust 库,为开发并发应用程序提供了框架。
Actix 建立在参与者模型(Actor model)之上,它使应用程序可以编写为一组独立执行而又相互协作的 “参与者”,这些参与者通过消息进行通信。参与者是封装状态与行为并且会在由 actix 库提供的 Actor System 中运行的对象。
参与者在指定的上下文 Context<A>
中运行。
该上下文对象只在执行期间可用。每个参与者都有一个独立的执行上下文。该执行上下文还控制参与者的生命周期。
参与者仅通过交换消息进行通信。发送方参与者可以选择等待该响应。不能直接引用参与者,而要通过地址来引用。
任何 rust 类型都可以是一个参与者,只需实现 Actor
trait 即可。
为了能够处理指定消息,参与者必须提供这种消息的 Handler<M>
实现。所有消息都是静态类型的。可以使用异步方式处理消息。
参与者可以产生其他参与者或者将 future 或 stream 添加到执行上下文。
Actor
trait 提供了几种可以控制参与者生命周期的方法。
参与者生命周期
已启动(Started)
参与者总是以 Started
状态启动。在这一状态期间调用了该参与者的 started()
方法。Actor
trait 为这个方法提供了默认实现。
在这一状态期间可以使用参与者上下文,并且该参与者可以启动更多参与者或者注册异步流或者做任何其他所需的配置操作。
运行中(Running)
调用参与者的 started()
方法后,该参与者会转换为 Running
状态。
参与者可以无限期地处于 running
状态。
停止中(Stopping)
在以下情况下,参与者的执行状态会变更为 stopping
状态:
- 该参与者自身调用了
Context::stop
- 该参与者的所有地址都已删除。即没有其他参与者引用它。
- 在上下文中没有注册事件对象。
一个参与者可以由 stopping
状态恢复为 running
状态,通过创建一个新的地址或者添加事件对象,以及通过返回 Running::Continue
实现。
如果一个参与者状态变更为 stopping
是因为调用了 Context::stop()
,
那么该上下文会立即停止处理接入的消息,并调用
Actor::stopping()
。如果参与者没有恢复到 running
状态,那么删除所有未处理的消息。
默认这个方法返回 Running::Stop
,确认停止操作。
已停止(Stopped)
如果参与者在停止中状态期间没有修改执行上下文,那么参与者状态会变更为 Stopped
。这个状态被认为是最终状态,此时该参与者会被 drop。
消息
一个 Actor 通过发送消息与其他参与者通信。在 actix 中的所有消息都是类型化的。消息可以是任何实现了
Message
trait 的 rust 类型。Message::Result
定义了其返回值类型。
让我们来定义一个简单的 Ping
消息——接受这种消息的参与者需要返回
Result<bool, std::io::Error>
。
# extern crate actix; use actix::prelude::*; struct Ping; impl Message for Ping { type Result = Result<bool, std::io::Error>; } # # fn main() {}
产生一个参与者
如何启动一个参与者取决于它的上下文。可通过
Actor
trait 的
start
与 create
实现产生一个新的异步参与者。Actor trait 提供了几种不同的创建参与者的方式;更详细信息请查看其文档。
完整示例
# extern crate actix; # extern crate actix_rt; use actix::prelude::*; /// 定义消息 #[derive(Message)] #[rtype(result = "Result<bool, std::io::Error>")] struct Ping; // 定义参与者 struct MyActor; // 为我们的参与者提供 Actor 实现 impl Actor for MyActor { type Context = Context<Self>; fn started(&mut self, ctx: &mut Context<Self>) { println!("Actor is alive"); } fn stopped(&mut self, ctx: &mut Context<Self>) { println!("Actor is stopped"); } } /// 为 `Ping` 消息定义处理程序 impl Handler<Ping> for MyActor { type Result = Result<bool, std::io::Error>; fn handle(&mut self, msg: Ping, ctx: &mut Context<Self>) -> Self::Result { println!("Ping received"); Ok(true) } } #[actix_rt::main] async fn main() { // 在当前线程启动 MyActor let addr = MyActor.start(); // 发送 Ping 消息。 // send() 消息返回 Future 对象,可解析出消息结果 let result = addr.send(Ping).await; match result { Ok(res) => println!("Got result: {}", res.unwrap()), Err(err) => println!("Got error: {}", err), } }
以 MessageResponse 进行响应
我们来看看上例中为 impl Handler
定义的 Result
类型。
看下我们是如何返回一个 Result<bool, std::io::Error>
的?我们能够以这种类型响应该参与者的接入消息,是因为它已经为该类型实现了 MessageResponse
trait。
这是该 trait 的定义:
pub trait MessageResponse<A: Actor, M: Message> {
fn handle<R: ResponseChannel<M>>(self, ctx: &mut A::Context, tx: Option<R>);
}
有时会需要以没有为其实现这个 trait
的类型来响应接入的消息。当出现这种情况时,我们可以自己实现该 trait。
以下是一个示例,其中我们以 GotPing
响应 Ping
消息、
以 GotPong
响应 Pong
消息。
# extern crate actix; # extern crate actix_rt; use actix::dev::{MessageResponse, ResponseChannel}; use actix::prelude::*; #[derive(Message)] #[rtype(result = "Responses")] enum Messages { Ping, Pong, } enum Responses { GotPing, GotPong, } impl<A, M> MessageResponse<A, M> for Responses where A: Actor, M: Message<Result = Responses>, { fn handle<R: ResponseChannel<M>>(self, _: &mut A::Context, tx: Option<R>) { if let Some(tx) = tx { tx.send(self); } } } // 定义参与者 struct MyActor; // 为我们的参与者提供 Actor 实现 impl Actor for MyActor { type Context = Context<Self>; fn started(&mut self, _ctx: &mut Context<Self>) { println!("Actor is alive"); } fn stopped(&mut self, _ctx: &mut Context<Self>) { println!("Actor is stopped"); } } /// 为 `Messages` 枚举定义处理程序 impl Handler<Messages> for MyActor { type Result = Responses; fn handle(&mut self, msg: Messages, _ctx: &mut Context<Self>) -> Self::Result { match msg { Messages::Ping => Responses::GotPing, Messages::Pong => Responses::GotPong, } } } #[actix_rt::main] async fn main() { // 在当前线程启动 MyActor let addr = MyActor.start(); // 发送 Ping 消息。 // send() 消息返回 Future 对象,可解析出消息结果 let ping_future = addr.send(Messages::Ping).await; let pong_future = addr.send(Messages::Pong).await; match pong_future { Ok(res) => match res { Responses::GotPing => println!("Ping received"), Responses::GotPong => println!("Pong received"), }, Err(e) => println!("Actor is probably dead: {}", e), } match ping_future { Ok(res) => match res { Responses::GotPing => println!("Ping received"), Responses::GotPong => println!("Pong received"), }, Err(e) => println!("Actor is probably dead: {}", e), } }
地址
参与者仅通过交换消息进行通信。发送方参与者可以选择等待该响应。不能直接引用参与者,只能通过其地址来引用。
有几种方式来获取参与者的地址。Actor
trait 提供了两个辅助方法来启动参与者。这两个方法都会返回所启动参与者的地址。
以下是一个 Actor::start()
方法用法的示例。在这个示例中 MyActor
参与者是异步的,并且在与调用者相同的线程中启动——线程会在
SyncArbiter 章节中介绍。
# extern crate actix; # use actix::prelude::*; # struct MyActor; impl Actor for MyActor { type Context = Context<Self>; } # fn main() { # System::new("test"); let addr = MyActor.start(); # }
异步参与者可以由 Context
结构获取其地址。该上下文需要实现
AsyncContext
trait。AsyncContext::address()
提供了参与者的地址。
# extern crate actix; # use actix::prelude::*; # struct MyActor; impl Actor for MyActor { type Context = Context<Self>; fn started(&mut self, ctx: &mut Context<Self>) { let addr = ctx.address(); } } # # fn main() {}
消息
为了能够处理指定消息,参与者必须提供这种消息的 Handler<M>
实现。
所有消息都是静态类型的。可以使用异步方式处理消息。参与者可以产生其他参与者或者将 future 或
stream 添加到执行上下文。参与者 trait 提供了几种可以控制参与者生命周期的方法。
如需向参与者发送消息,需要使用 Addr
对象。Addr
提供了几种发送消息的方式。
-
Addr::do_send(M)
——这个方法会忽略消息发送中的任何错误。如果信箱已满,那么仍会绕过限制将该消息排入队列。如果该参与者的信箱已关闭, 那么会以静默方式丢弃该消息。这个方法不会返回结果,因此信箱关闭及发生故障都无从知悉。 -
Addr::try_send(M)
——这个方法会立即尝试发送该消息。如果信箱已满或者关闭(参与者已死),那么这个方法返回SendError
。 -
Addr::send(M)
——这个消息返回一个可解析出消息处理过程的结果的 future 对象。如果返回的Future
对象被 drop,那么会取消该消息。
收信方
收信方是仅支持一种类型消息的一种地址的专用版。
可以用于需要将消息发送给不同类型的参与者的场景。
可以用 Addr::recipient()
由地址创建收信方对象。
Address objects require an actor type, but if we just want to send a specific message to an actor that can handle the message, we can use the Recipient interface.
例如,收信方可以用于订阅系统。在以下示例中,
OrderEvents
参与者向所有订阅者发送 OrderShipped
消息。订阅者可以是实现了 Handler<OrderShipped>
trait 的任何参与者。
# extern crate actix; use actix::prelude::*; #[derive(Message)] #[rtype(result = "()")] struct OrderShipped(usize); #[derive(Message)] #[rtype(result = "()")] struct Ship(usize); /// Subscribe to order shipped event. #[derive(Message)] #[rtype(result = "()")] struct Subscribe(pub Recipient<OrderShipped>); /// Actor that provides order shipped event subscriptions struct OrderEvents { subscribers: Vec<Recipient<OrderShipped>>, } impl OrderEvents { fn new() -> Self { OrderEvents { subscribers: vec![] } } } impl Actor for OrderEvents { type Context = Context<Self>; } impl OrderEvents { /// Send event to all subscribers fn notify(&mut self, order_id: usize) { for subscr in &self.subscribers { subscr.do_send(OrderShipped(order_id)); } } } /// Subscribe to shipment event impl Handler<Subscribe> for OrderEvents { type Result = (); fn handle(&mut self, msg: Subscribe, _: &mut Self::Context) { self.subscribers.push(msg.0); } } /// Subscribe to ship message impl Handler<Ship> for OrderEvents { type Result = (); fn handle(&mut self, msg: Ship, ctx: &mut Self::Context) -> Self::Result { self.notify(msg.0); System::current().stop(); } } /// Email Subscriber struct EmailSubscriber; impl Actor for EmailSubscriber { type Context = Context<Self>; } impl Handler<OrderShipped> for EmailSubscriber { type Result = (); fn handle(&mut self, msg: OrderShipped, _ctx: &mut Self::Context) -> Self::Result { println!("Email sent for order {}", msg.0) } } struct SmsSubscriber; impl Actor for SmsSubscriber { type Context = Context<Self>; } impl Handler<OrderShipped> for SmsSubscriber { type Result = (); fn handle(&mut self, msg: OrderShipped, _ctx: &mut Self::Context) -> Self::Result { println!("SMS sent for order {}", msg.0) } } fn main() { let system = System::new("events"); let email_subscriber = Subscribe(EmailSubscriber{}.start().recipient()); let sms_subscriber = Subscribe(SmsSubscriber{}.start().recipient()); let order_event = OrderEvents::new().start(); order_event.do_send(email_subscriber); order_event.do_send(sms_subscriber); order_event.do_send(Ship(1)); system.run(); }
Context
Actors all maintain an internal execution context, or state. This allows an actor to determine its own Address, change mailbox limits, or stop its execution.
Mailbox
All messages go to the actor's mailbox first, then the actor's execution context
calls specific message handlers. Mailboxes in general are bounded. The capacity is
specific to the context implementation. For the Context
type the capacity is set to
16 messages by default and can be increased with Context::set_mailbox_capacity()
.
# extern crate actix; # use actix::prelude::*; # struct MyActor; impl Actor for MyActor { type Context = Context<Self>; fn started(&mut self, ctx: &mut Self::Context) { ctx.set_mailbox_capacity(1); } } # fn main() { # System::new("test"); let addr = MyActor.start(); # }
Remember that this doesn't apply to Addr::do_send(M)
which bypasses the Mailbox queue limit, or
AsyncContext::notify(M)
and AsyncContext::notify_later(M, Duration)
which bypasses the mailbox
entirely.
Getting your actors Address
An actor can view its own address from its context. Perhaps you want to requeue an event for
later, or you want to transform the message type. Maybe you want to respond with your address
to a message. If you want an actor to send a message to itself, have a look at
AsyncContext::notify(M)
instead.
To get your address from the context you call Context::address()
. An example is:
# extern crate actix; # use actix::prelude::*; # struct MyActor; struct WhoAmI; impl Message for WhoAmI { type Result = Result<actix::Addr<MyActor>, ()>; } impl Actor for MyActor { type Context = Context<Self>; } impl Handler<WhoAmI> for MyActor { type Result = Result<actix::Addr<MyActor>, ()>; fn handle(&mut self, msg: WhoAmI, ctx: &mut Context<Self>) -> Self::Result { Ok(ctx.address()) } } # fn main() { # System::new("scratch"); # let addr = MyActor.start(); let who_addr = addr.do_send(WhoAmI{}); # }
Stopping an Actor
From within the actors execution context you can choose to stop the actor from processing
any future Mailbox messages. This could be in response to an error condition, or as part
of program shutdown. To do this you call Context::stop()
.
This is an adjusted Ping example that stops after 4 pings are received.
# extern crate actix; # extern crate actix_rt; # use actix::prelude::*; # struct MyActor { # count: usize, # } # impl Actor for MyActor { # type Context = Context<Self>; # } # # #[derive(Message)] # #[rtype(result = "usize")] # struct Ping(usize); # impl Handler<Ping> for MyActor { type Result = usize; fn handle(&mut self, msg: Ping, ctx: &mut Context<Self>) -> Self::Result { self.count += msg.0; if self.count > 5 { println!("Shutting down ping receiver."); ctx.stop() } self.count } } #[actix_rt::main] async fn main() { // start new actor let addr = MyActor { count: 10 }.start(); // send message and get future for result let addr_2 = addr.clone(); let res = addr.send(Ping(6)).await; match res { Ok(_) => assert!(addr_2.try_send(Ping(6)).is_err()), _ => {} } }
Arbiter
Arbiter
s provide an asynchronous execution context for Actor
s, functions
and futures
. Where an
actor contains a Context
that defines its Actor specific execution state,
Arbiters host the environment where an actor runs.
As a result Arbiters perform a number of functions. Most notably, they are able to spawn a new OS thread, run an event loop, spawn tasks asynchronously on that event loop, and act as helpers for asynchronous tasks.
System and Arbiter
In all our previous code examples the function System::new
creates an Arbiter
for your actors to run inside. When you call start()
on your actor it is then
running inside of the System Arbiter's thread. In many cases, this is all you
will need for a program using Actix.
While it only uses one thread, it uses the very efficient event loop pattern
which works well for asynchronous events. To handle synchronous, CPU-bound
tasks, it's better to avoid blocking the event loop and instead offload the
computation to other threads. For this usecase, read the next section and
consider using SyncArbiter
.
The event loop
One Arbiter
is in control of one thread with one event pool. When an Arbiter
spawns a task (via Arbiter::spawn
, Context<Actor>::run_later
, or similar
constructs), the Arbiter queues the task for execution on that task queue. When
you think Arbiter
, you can think "single-threaded event loop".
Actix in general does support concurrency, but normal Arbiter
s (not
SyncArbiter
s) do not. To use Actix in a concurrent way, you can spin up
multiple Arbiter
s using Arbiter::new
, ArbiterBuilder
, or Arbiter::start
.
When you create a new Arbiter, this creates a new execution context for Actors.
The new thread is available to add new Actors to it, but Actors cannot freely
move between Arbiters: they are tied to the Arbiter they were spawned in.
However, Actors on different Arbiters can still communicate with each other
using the normal Addr
/Recipient
methods. The method of passing messages is
agnostic to whether the Actors are running on the same or different Arbiters.
Using Arbiter for resolving async events
If you aren't an expert in Rust Futures, Arbiter can be a helpful and simple
wrapper to resolving async events in order. Consider we have two actors, A and
B, and we want to run an event on B only once a result from A is completed. We
can use Arbiter::spawn
to assist with this task.
# extern crate actix; use actix::prelude::*; struct SumActor {} impl Actor for SumActor { type Context = Context<Self>; } #[derive(Message)] #[rtype(result = "usize")] struct Value(usize, usize); impl Handler<Value> for SumActor { type Result = usize; fn handle(&mut self, msg: Value, _ctx: &mut Context<Self>) -> Self::Result { msg.0 + msg.1 } } struct DisplayActor {} impl Actor for DisplayActor { type Context = Context<Self>; } #[derive(Message)] #[rtype(result = "()")] struct Display(usize); impl Handler<Display> for DisplayActor { type Result = (); fn handle(&mut self, msg: Display, _ctx: &mut Context<Self>) -> Self::Result { println!("Got {:?}", msg.0); } } fn main() { let system = System::new("single-arbiter-example"); // Define an execution flow using futures let execution = async { // `Actor::start` spawns the `Actor` on the *current* `Arbiter`, which // in this case is the System arbiter let sum_addr = SumActor {}.start(); let dis_addr = DisplayActor {}.start(); // Start by sending a `Value(6, 7)` to our `SumActor`. // `Addr::send` responds with a `Request`, which implements `Future`. // When awaited, it will resolve to a `Result<usize, MailboxError>`. let sum_result = sum_addr.send(Value(6, 7)).await; match sum_result { Ok(res) => { // `res` is now the `usize` returned from `SumActor` as a response to `Value(6, 7)` // Once the future is complete, send the successful response (`usize`) // to the `DisplayActor` wrapped in a `Display dis_addr.send(Display(res)).await; } Err(e) => { eprintln!("Encountered mailbox error: {:?}", e); } }; }; // Spawn the future onto the current Arbiter/event loop Arbiter::spawn(execution); // We only want to do one computation in this example, so we // shut down the `System` which will stop any Arbiters within // it (including the System Arbiter), which will in turn stop // any Actor Contexts running within those Arbiters, finally // shutting down all Actors. System::current().stop(); system.run(); }
SyncArbiter
When you normally run Actors, there are multiple Actors running on the System's Arbiter thread, using its event loop. However for CPU bound workloads, or highly concurrent workloads, you may wish to have an Actor running multiple instances in parallel.
This is what a SyncArbiter provides - the ability to launch multiple instances of an Actor on a pool of OS threads.
It's important to note a SyncArbiter can only host a single type of Actor. This means you need to create a SyncArbiter for each type of Actor you want to run in this manner.
Creating a Sync Actor
When implementing your Actor to be run on a SyncArbiter, it requires that your Actor's
Context is changed from Context
to SyncContext
.
# extern crate actix; use actix::prelude::*; struct MySyncActor; impl Actor for MySyncActor { type Context = SyncContext<Self>; } # # fn main() { # System::new("test"); # }
Starting the Sync Arbiter
Now that we have defined a Sync Actor, we can run it on a thread pool, created by
our SyncArbiter
. We can only control the number of threads at SyncArbiter creation
time - we can't add/remove threads later.
# extern crate actix; use actix::prelude::*; struct MySyncActor; impl Actor for MySyncActor { type Context = SyncContext<Self>; } # fn main() { # System::new("test"); let addr = SyncArbiter::start(2, || MySyncActor); # }
We can communicate with the addr the same way as we have with our previous Actors that we started. We can send messages, receive futures and results, and more.
Sync Actor Mailboxes
Sync Actors have no Mailbox limits, but you should still use do_send
, try_send
and send
as normal to account for other possible errors or sync vs async behavior.
WIP
WIP
WIP
WIP
WIP