地址
参与者仅通过交换消息进行通信。发送方参与者可以选择等待该响应。不能直接引用参与者,只能通过其地址来引用。
有几种方式来获取参与者的地址。Actor
trait 提供了两个辅助方法来启动参与者。这两个方法都会返回所启动参与者的地址。
以下是一个 Actor::start()
方法用法的示例。在这个示例中 MyActor
参与者是异步的,并且在与调用者相同的线程中启动——线程会在
SyncArbiter 章节中介绍。
# extern crate actix; # use actix::prelude::*; # struct MyActor; impl Actor for MyActor { type Context = Context<Self>; } # fn main() { # System::new("test"); let addr = MyActor.start(); # }
异步参与者可以由 Context
结构获取其地址。该上下文需要实现
AsyncContext
trait。AsyncContext::address()
提供了参与者的地址。
# extern crate actix; # use actix::prelude::*; # struct MyActor; impl Actor for MyActor { type Context = Context<Self>; fn started(&mut self, ctx: &mut Context<Self>) { let addr = ctx.address(); } } # # fn main() {}
消息
为了能够处理指定消息,参与者必须提供这种消息的 Handler<M>
实现。
所有消息都是静态类型的。可以使用异步方式处理消息。参与者可以产生其他参与者或者将 future 或
stream 添加到执行上下文。参与者 trait 提供了几种可以控制参与者生命周期的方法。
如需向参与者发送消息,需要使用 Addr
对象。Addr
提供了几种发送消息的方式。
-
Addr::do_send(M)
——这个方法会忽略消息发送中的任何错误。如果信箱已满,那么仍会绕过限制将该消息排入队列。如果该参与者的信箱已关闭, 那么会以静默方式丢弃该消息。这个方法不会返回结果,因此信箱关闭及发生故障都无从知悉。 -
Addr::try_send(M)
——这个方法会立即尝试发送该消息。如果信箱已满或者关闭(参与者已死),那么这个方法返回SendError
。 -
Addr::send(M)
——这个消息返回一个可解析出消息处理过程的结果的 future 对象。如果返回的Future
对象被 drop,那么会取消该消息。
收信方
收信方是仅支持一种类型消息的一种地址的专用版。
可以用于需要将消息发送给不同类型的参与者的场景。
可以用 Addr::recipient()
由地址创建收信方对象。
Address objects require an actor type, but if we just want to send a specific message to an actor that can handle the message, we can use the Recipient interface.
例如,收信方可以用于订阅系统。在以下示例中,
OrderEvents
参与者向所有订阅者发送 OrderShipped
消息。订阅者可以是实现了 Handler<OrderShipped>
trait 的任何参与者。
# extern crate actix; use actix::prelude::*; #[derive(Message)] #[rtype(result = "()")] struct OrderShipped(usize); #[derive(Message)] #[rtype(result = "()")] struct Ship(usize); /// Subscribe to order shipped event. #[derive(Message)] #[rtype(result = "()")] struct Subscribe(pub Recipient<OrderShipped>); /// Actor that provides order shipped event subscriptions struct OrderEvents { subscribers: Vec<Recipient<OrderShipped>>, } impl OrderEvents { fn new() -> Self { OrderEvents { subscribers: vec![] } } } impl Actor for OrderEvents { type Context = Context<Self>; } impl OrderEvents { /// Send event to all subscribers fn notify(&mut self, order_id: usize) { for subscr in &self.subscribers { subscr.do_send(OrderShipped(order_id)); } } } /// Subscribe to shipment event impl Handler<Subscribe> for OrderEvents { type Result = (); fn handle(&mut self, msg: Subscribe, _: &mut Self::Context) { self.subscribers.push(msg.0); } } /// Subscribe to ship message impl Handler<Ship> for OrderEvents { type Result = (); fn handle(&mut self, msg: Ship, ctx: &mut Self::Context) -> Self::Result { self.notify(msg.0); System::current().stop(); } } /// Email Subscriber struct EmailSubscriber; impl Actor for EmailSubscriber { type Context = Context<Self>; } impl Handler<OrderShipped> for EmailSubscriber { type Result = (); fn handle(&mut self, msg: OrderShipped, _ctx: &mut Self::Context) -> Self::Result { println!("Email sent for order {}", msg.0) } } struct SmsSubscriber; impl Actor for SmsSubscriber { type Context = Context<Self>; } impl Handler<OrderShipped> for SmsSubscriber { type Result = (); fn handle(&mut self, msg: OrderShipped, _ctx: &mut Self::Context) -> Self::Result { println!("SMS sent for order {}", msg.0) } } fn main() { let system = System::new("events"); let email_subscriber = Subscribe(EmailSubscriber{}.start().recipient()); let sms_subscriber = Subscribe(SmsSubscriber{}.start().recipient()); let order_event = OrderEvents::new().start(); order_event.do_send(email_subscriber); order_event.do_send(sms_subscriber); order_event.do_send(Ship(1)); system.run(); }