playwright/imp/core/
event_emitter.rs

1use crate::imp::{core::*, prelude::*};
2pub(crate) use tokio::sync::{broadcast, broadcast::error::TryRecvError};
3
4pub trait EventEmitter {
5    type Event: Clone;
6
7    fn tx(&self) -> Option<broadcast::Sender<Self::Event>>;
8
9    fn set_tx(&self, tx: broadcast::Sender<Self::Event>);
10
11    fn new_tx(
12        &self
13    ) -> (
14        broadcast::Sender<Self::Event>,
15        broadcast::Receiver<Self::Event>
16    ) {
17        broadcast::channel(64)
18    }
19
20    fn subscribe_event(&self) -> broadcast::Receiver<Self::Event> {
21        if let Some(tx) = self.tx() {
22            tx.subscribe()
23        } else {
24            let (tx, rx) = self.new_tx();
25            self.set_tx(tx);
26            rx
27        }
28    }
29
30    fn emit_event<E: Into<Self::Event>>(&self, e: E) { self.tx().map(|tx| tx.send(e.into()).ok()); }
31}
32
33pub(crate) trait IsEvent: Clone {
34    type EventType: Clone + Copy + PartialEq;
35
36    fn event_type(&self) -> Self::EventType;
37}
38
39#[cfg(any(feature = "rt-tokio", feature = "rt-actix"))]
40pub(crate) async fn expect_event<E>(
41    mut rx: broadcast::Receiver<E>,
42    evt: E::EventType,
43    timeout: u32
44) -> Result<E, Error>
45where
46    E: IsEvent + Send + Sync + 'static,
47    <E as event_emitter::IsEvent>::EventType: Send + Sync
48{
49    consume(&mut rx).await?;
50    let sleep = sleep(Duration::from_millis(timeout as u64));
51    let event = spawn(async move {
52        loop {
53            match rx.recv().await {
54                Ok(x) if x.event_type() == evt => break Ok(x),
55                Ok(_) => continue,
56                Err(e) => break Err(e)
57            }
58        }
59    });
60    tokio::select! {
61        _ = sleep => Err(Error::Timeout),
62        x = event => x?.map_err(Error::Event)
63    }
64}
65
66#[cfg(feature = "rt-async-std")]
67pub(crate) async fn expect_event<E>(
68    mut rx: broadcast::Receiver<E>,
69    evt: E::EventType,
70    timeout: u32
71) -> Result<E, Error>
72where
73    E: IsEvent + Send + Sync + 'static,
74    <E as event_emitter::IsEvent>::EventType: Send + Sync
75{
76    consume(&mut rx).await?;
77    let sleep = sleep(Duration::from_millis(timeout as u64));
78    let event = spawn(async move {
79        loop {
80            match rx.recv().await {
81                Ok(x) if x.event_type() == evt => break Ok(x),
82                Ok(_) => continue,
83                Err(e) => break Err(e)
84            }
85        }
86    });
87    tokio::select! {
88        _ = sleep => Err(Error::Timeout),
89        x = event => x.map_err(Error::Event)
90    }
91}
92
93async fn consume<E>(rx: &mut broadcast::Receiver<E>) -> Result<(), Error>
94where
95    E: IsEvent
96{
97    loop {
98        match rx.try_recv() {
99            Err(TryRecvError::Empty) | Err(TryRecvError::Closed) => break,
100            _ => {}
101        }
102    }
103    Ok(())
104}
105
106#[cfg(test)]
107mod tests {
108    crate::runtime_test!(select, {
109        use crate::imp::prelude::*;
110        let first = sleep(Duration::from_millis(200u64));
111        let second = sleep(Duration::from_millis(400u64));
112        tokio::select! {
113            _ = first => {},
114            _ = second => unreachable!()
115        }
116    });
117}