playwright/imp/core/
event_emitter.rs1use crate::imp::{core::*, prelude::*};
2pub(crate) use tokio::sync::{broadcast, broadcast::error::TryRecvError};
3
4pub trait EventEmitter {
5 type Event: Clone;
6
7 fn tx(&self) -> Option<broadcast::Sender<Self::Event>>;
8
9 fn set_tx(&self, tx: broadcast::Sender<Self::Event>);
10
11 fn new_tx(
12 &self
13 ) -> (
14 broadcast::Sender<Self::Event>,
15 broadcast::Receiver<Self::Event>
16 ) {
17 broadcast::channel(64)
18 }
19
20 fn subscribe_event(&self) -> broadcast::Receiver<Self::Event> {
21 if let Some(tx) = self.tx() {
22 tx.subscribe()
23 } else {
24 let (tx, rx) = self.new_tx();
25 self.set_tx(tx);
26 rx
27 }
28 }
29
30 fn emit_event<E: Into<Self::Event>>(&self, e: E) { self.tx().map(|tx| tx.send(e.into()).ok()); }
31}
32
33pub(crate) trait IsEvent: Clone {
34 type EventType: Clone + Copy + PartialEq;
35
36 fn event_type(&self) -> Self::EventType;
37}
38
39#[cfg(any(feature = "rt-tokio", feature = "rt-actix"))]
40pub(crate) async fn expect_event<E>(
41 mut rx: broadcast::Receiver<E>,
42 evt: E::EventType,
43 timeout: u32
44) -> Result<E, Error>
45where
46 E: IsEvent + Send + Sync + 'static,
47 <E as event_emitter::IsEvent>::EventType: Send + Sync
48{
49 consume(&mut rx).await?;
50 let sleep = sleep(Duration::from_millis(timeout as u64));
51 let event = spawn(async move {
52 loop {
53 match rx.recv().await {
54 Ok(x) if x.event_type() == evt => break Ok(x),
55 Ok(_) => continue,
56 Err(e) => break Err(e)
57 }
58 }
59 });
60 tokio::select! {
61 _ = sleep => Err(Error::Timeout),
62 x = event => x?.map_err(Error::Event)
63 }
64}
65
66#[cfg(feature = "rt-async-std")]
67pub(crate) async fn expect_event<E>(
68 mut rx: broadcast::Receiver<E>,
69 evt: E::EventType,
70 timeout: u32
71) -> Result<E, Error>
72where
73 E: IsEvent + Send + Sync + 'static,
74 <E as event_emitter::IsEvent>::EventType: Send + Sync
75{
76 consume(&mut rx).await?;
77 let sleep = sleep(Duration::from_millis(timeout as u64));
78 let event = spawn(async move {
79 loop {
80 match rx.recv().await {
81 Ok(x) if x.event_type() == evt => break Ok(x),
82 Ok(_) => continue,
83 Err(e) => break Err(e)
84 }
85 }
86 });
87 tokio::select! {
88 _ = sleep => Err(Error::Timeout),
89 x = event => x.map_err(Error::Event)
90 }
91}
92
93async fn consume<E>(rx: &mut broadcast::Receiver<E>) -> Result<(), Error>
94where
95 E: IsEvent
96{
97 loop {
98 match rx.try_recv() {
99 Err(TryRecvError::Empty) | Err(TryRecvError::Closed) => break,
100 _ => {}
101 }
102 }
103 Ok(())
104}
105
106#[cfg(test)]
107mod tests {
108 crate::runtime_test!(select, {
109 use crate::imp::prelude::*;
110 let first = sleep(Duration::from_millis(200u64));
111 let second = sleep(Duration::from_millis(400u64));
112 tokio::select! {
113 _ = first => {},
114 _ = second => unreachable!()
115 }
116 });
117}