playwright/imp/core/
connection.rs

1use super::driver::Driver;
2use crate::imp::{core::*, prelude::*};
3use std::{
4    io,
5    process::{Child, Command, Stdio},
6    sync::{
7        atomic::{AtomicBool, Ordering},
8        TryLockError,
9    },
10};
11
12#[derive(Debug)]
13pub(crate) struct Context {
14    objects: HashMap<Str<Guid>, RemoteArc>,
15    ctx: Wm<Context>,
16    id: i32,
17    callbacks: HashMap<i32, WaitPlaces<WaitMessageResult>>,
18    writer: Writer,
19}
20
21#[derive(Debug)]
22pub(crate) struct Connection {
23    _child: Child,
24    ctx: Am<Context>,
25    reader: Am<Reader>,
26    should_stop: Arc<AtomicBool>,
27}
28
29#[derive(thiserror::Error, Debug)]
30pub enum Error {
31    #[error(transparent)]
32    Io(#[from] io::Error),
33    #[error("Failed to initialize")]
34    InitializationError,
35    #[error("Disconnected")]
36    ReceiverClosed,
37    #[error("Invalid message")]
38    InvalidParams,
39    #[error("Object not found")]
40    ObjectNotFound,
41    #[error(transparent)]
42    Serde(#[from] serde_json::Error),
43    #[error("Failed to send")]
44    Channel,
45    #[error(transparent)]
46    Transport(#[from] TransportError),
47    #[error("Callback not found")]
48    CallbackNotFound,
49    #[error(transparent)]
50    ErrorResponded(#[from] Arc<ErrorMessage>),
51    #[error("Value is not Object")]
52    NotObject,
53    #[error("guid not found in {0:?}")]
54    GuidNotFound(Value),
55    #[error(transparent)]
56    InvalidBase64(#[from] base64::DecodeError),
57    #[error(transparent)]
58    InvalidUtf8(#[from] std::string::FromUtf8Error),
59    #[error(transparent)]
60    SerializationPwJson(#[from] ser::Error),
61    #[error(transparent)]
62    DeserializationPwJson(#[from] de::Error),
63    #[error(transparent)]
64    Arc(#[from] Arc<Error>),
65    #[error(transparent)]
66    Event(#[from] broadcast::error::RecvError),
67    #[error("Path is not available when using BrowserType.connect(). Use save_as() to save a local copy.")]
68    RemoteArtifact,
69    #[error("Failed to resolve path {0:?}")]
70    ResolvePath(PathBuf),
71    #[error("Timed out")]
72    Timeout,
73    #[error(transparent)]
74    Join(#[from] JoinError),
75}
76
77pub(crate) type ArcResult<T> = Result<T, Arc<Error>>;
78
79impl Drop for Connection {
80    fn drop(&mut self) {
81        self.notify_closed(Error::ReceiverClosed);
82        self.should_stop.store(true, Ordering::Relaxed);
83    }
84}
85
86impl Connection {
87    fn try_new(driver: &Driver) -> io::Result<Connection> {
88        // For Playwright 1.50+, we run: node package/cli.js run-driver
89        let executable = driver.executable();
90        let cli_script = driver.cli_script();
91        let mut child = Command::new(&executable)
92            .arg(&cli_script)
93            .args(&["run-driver"])
94            .stdin(Stdio::piped())
95            .stdout(Stdio::piped())
96            .stderr(Stdio::null())
97            .spawn()?;
98        // TODO: env "NODE_OPTIONS"
99        let stdin = child.stdin.take().unwrap();
100        let stdout = child.stdout.take().unwrap();
101        let reader = Reader::new(stdout);
102        let writer = Writer::new(stdin);
103        let ctx = Context::new(writer);
104        Ok(Self {
105            _child: child,
106            ctx,
107            should_stop: Arc::new(false.into()),
108            reader: Arc::new(Mutex::new(reader)),
109        })
110    }
111
112    pub(crate) fn run(driver: &Driver) -> io::Result<Connection> {
113        let conn = Self::try_new(driver)?;
114
115        // Playwright 1.50+ requires an "initialize" message before sending objects
116        {
117            let empty_guid: &S<Guid> = S::validate("").unwrap();
118            let initialize_method: &S<Method> = S::validate("initialize").unwrap();
119            let mut params = Map::new();
120            params.insert(
121                "sdkLanguage".to_string(),
122                Value::String("javascript".to_string()),
123            );
124
125            let req = Req {
126                id: 0,
127                guid: empty_guid,
128                method: initialize_method,
129                params,
130                metadata: Map::new(),
131            };
132
133            let mut ctx = conn.ctx.lock().unwrap();
134            ctx.writer
135                .send(&req)
136                .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
137        }
138
139        conn.start();
140        Ok(conn)
141    }
142
143    fn start(&self) {
144        let c2 = Arc::downgrade(&self.ctx);
145        let r2 = Arc::downgrade(&self.reader);
146        let s2 = Arc::downgrade(&self.should_stop);
147        std::thread::spawn(move || {
148            let c = c2;
149            let r = r2;
150            let s = s2;
151            log::trace!("succcess starting connection");
152            let status = (|| -> Result<(), Error> {
153                loop {
154                    let response = {
155                        let r = match r.upgrade() {
156                            Some(x) => x,
157                            None => break,
158                        };
159                        let mut reader = match r.try_lock() {
160                            Ok(x) => x,
161                            Err(TryLockError::WouldBlock) => continue,
162                            Err(e) => Err(e).unwrap(),
163                        };
164                        match reader.try_read()? {
165                            Some(x) => x,
166                            None => continue,
167                        }
168                    };
169                    {
170                        let s = match s.upgrade() {
171                            Some(x) => x,
172                            None => break,
173                        };
174                        let should_stop = s.load(Ordering::Relaxed);
175                        if should_stop {
176                            break;
177                        }
178                    }
179                    // dispatch
180                    {
181                        let c = match c.upgrade() {
182                            Some(x) => x,
183                            None => break,
184                        };
185                        let mut ctx = c.lock().unwrap();
186                        ctx.dispatch(response)?;
187                        // log::debug!("{:?}", ctx.objects.keys());
188                    }
189                }
190                Ok(())
191            })();
192            if let Err(e) = status {
193                log::trace!("Failed with {:?}", e);
194                if let Some(c) = c.upgrade() {
195                    let mut ctx = c.lock().unwrap();
196                    ctx.notify_closed(e);
197                }
198            } else {
199                log::trace!("Done");
200            }
201        });
202    }
203
204    pub(crate) fn context(&self) -> Wm<Context> {
205        Arc::downgrade(&self.ctx)
206    }
207
208    fn notify_closed(&mut self, e: Error) {
209        let ctx = &mut self.ctx.lock().unwrap();
210        ctx.notify_closed(e);
211    }
212}
213
214impl Context {
215    fn new(writer: Writer) -> Am<Context> {
216        let objects = {
217            let mut d = HashMap::new();
218            let root = RootObject::new();
219            d.insert(root.guid().to_owned(), RemoteArc::Root(Arc::new(root)));
220            d
221        };
222        let ctx = Context {
223            objects,
224            ctx: Weak::new(),
225            id: 0,
226            callbacks: HashMap::new(),
227            writer,
228        };
229        let am = Arc::new(Mutex::new(ctx));
230        am.lock().unwrap().ctx = Arc::downgrade(&am);
231        am
232    }
233
234    fn notify_closed(&mut self, e: Error) {
235        let err = Arc::new(e);
236        for p in self.callbacks.iter().map(|(_, v)| v) {
237            Context::respond_wait(p, Err(err.clone()));
238        }
239        self.objects = HashMap::new();
240    }
241
242    fn dispatch(&mut self, msg: Res) -> Result<(), Error> {
243        match msg {
244            Res::Result(msg) => {
245                // id=0 is the initialize message response, we don't need a callback for it
246                if msg.id == 0 {
247                    return Ok(());
248                }
249                let p = self.callbacks.get(&msg.id).ok_or(Error::CallbackNotFound)?;
250                Self::respond_wait(p, Ok(msg.body.map(Arc::new).map_err(Arc::new)));
251                return Ok(());
252            }
253            Res::Initial(msg) => {
254                if Method::is_create(&msg.method) {
255                    self.create_remote_object(&msg.guid, msg.params.clone())?;
256                    return Ok(());
257                }
258                if Method::is_dispose(&msg.method) {
259                    self.dispose(&msg.guid);
260                    return Ok(());
261                }
262                let target = self.objects.get(&msg.guid).ok_or(Error::ObjectNotFound)?;
263                let ResInitial { method, params, .. } = msg;
264                target.handle_event(self, method, params)?;
265            }
266        }
267        Ok(())
268    }
269
270    fn dispose(&mut self, i: &S<Guid>) {
271        let a = match self.objects.get(i) {
272            None => return,
273            Some(a) => a,
274        };
275        let cs = a.channel().children();
276        for c in cs {
277            let c = match c.upgrade() {
278                None => continue,
279                Some(c) => c,
280            };
281            self.dispose(&c.channel().guid);
282        }
283        self.remove_object(i);
284    }
285
286    fn respond_wait(
287        WaitPlaces { value, waker }: &WaitPlaces<WaitMessageResult>,
288        result: WaitMessageResult,
289    ) {
290        let place = match value.upgrade() {
291            Some(p) => p,
292            None => return,
293        };
294        let waker = match waker.upgrade() {
295            Some(x) => x,
296            None => return,
297        };
298        *place.lock().unwrap() = Some(result);
299        let waker: &Option<Waker> = &waker.lock().unwrap();
300        let waker = match waker {
301            Some(x) => x.clone(),
302            None => return,
303        };
304        waker.wake();
305    }
306
307    fn create_remote_object(
308        &mut self,
309        parent: &S<Guid>,
310        params: Map<String, Value>,
311    ) -> Result<(), Error> {
312        let CreateParams {
313            typ,
314            guid,
315            initializer,
316        } = serde_json::from_value(params.into())?;
317        let parent = self.objects.get(parent).ok_or(Error::ObjectNotFound)?;
318        let c = ChannelOwner::new(
319            self.ctx.clone(),
320            parent.downgrade(),
321            typ.to_owned(),
322            guid.to_owned(),
323            initializer,
324        );
325        let r = RemoteArc::try_new(&typ, self, c)?;
326        parent.channel().push_child(r.downgrade());
327        self.objects.insert(guid, r.clone());
328        match r {
329            RemoteArc::Page(p) => {
330                p.hook_created(Arc::downgrade(&p))?;
331            }
332            RemoteArc::Frame(f) => {
333                f.hook_created(Arc::downgrade(&f))?;
334            }
335            _ => (),
336        }
337        Ok(())
338    }
339
340    pub(in crate::imp) fn find_object(&self, k: &S<Guid>) -> Option<RemoteWeak> {
341        self.objects.get(k).map(|r| r.downgrade())
342    }
343
344    pub(in crate::imp) fn remove_object(&mut self, k: &S<Guid>) {
345        self.objects.remove(k);
346    }
347
348    pub(in crate::imp::core) fn send_message(&mut self, r: RequestBody) -> Result<(), Error> {
349        self.id += 1;
350        let RequestBody {
351            guid,
352            method,
353            params,
354            place,
355        } = r;
356        self.callbacks.insert(self.id, place);
357        let req = Req {
358            guid: &guid,
359            method: &method,
360            params,
361            id: self.id,
362            metadata: Map::new(),
363        };
364        self.writer.send(&req)?;
365        Ok(())
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use crate::imp::core::*;
372
373    crate::runtime_test!(start, {
374        let driver = Driver::install().unwrap();
375        let conn = Connection::try_new(&driver).unwrap();
376        Connection::start(&conn);
377    });
378}