1use super::driver::Driver;
2use crate::imp::{core::*, prelude::*};
3use std::{
4 io,
5 process::{Child, Command, Stdio},
6 sync::{
7 atomic::{AtomicBool, Ordering},
8 TryLockError,
9 },
10};
11
12#[derive(Debug)]
13pub(crate) struct Context {
14 objects: HashMap<Str<Guid>, RemoteArc>,
15 ctx: Wm<Context>,
16 id: i32,
17 callbacks: HashMap<i32, WaitPlaces<WaitMessageResult>>,
18 writer: Writer,
19}
20
21#[derive(Debug)]
22pub(crate) struct Connection {
23 _child: Child,
24 ctx: Am<Context>,
25 reader: Am<Reader>,
26 should_stop: Arc<AtomicBool>,
27}
28
29#[derive(thiserror::Error, Debug)]
30pub enum Error {
31 #[error(transparent)]
32 Io(#[from] io::Error),
33 #[error("Failed to initialize")]
34 InitializationError,
35 #[error("Disconnected")]
36 ReceiverClosed,
37 #[error("Invalid message")]
38 InvalidParams,
39 #[error("Object not found")]
40 ObjectNotFound,
41 #[error(transparent)]
42 Serde(#[from] serde_json::Error),
43 #[error("Failed to send")]
44 Channel,
45 #[error(transparent)]
46 Transport(#[from] TransportError),
47 #[error("Callback not found")]
48 CallbackNotFound,
49 #[error(transparent)]
50 ErrorResponded(#[from] Arc<ErrorMessage>),
51 #[error("Value is not Object")]
52 NotObject,
53 #[error("guid not found in {0:?}")]
54 GuidNotFound(Value),
55 #[error(transparent)]
56 InvalidBase64(#[from] base64::DecodeError),
57 #[error(transparent)]
58 InvalidUtf8(#[from] std::string::FromUtf8Error),
59 #[error(transparent)]
60 SerializationPwJson(#[from] ser::Error),
61 #[error(transparent)]
62 DeserializationPwJson(#[from] de::Error),
63 #[error(transparent)]
64 Arc(#[from] Arc<Error>),
65 #[error(transparent)]
66 Event(#[from] broadcast::error::RecvError),
67 #[error("Path is not available when using BrowserType.connect(). Use save_as() to save a local copy.")]
68 RemoteArtifact,
69 #[error("Failed to resolve path {0:?}")]
70 ResolvePath(PathBuf),
71 #[error("Timed out")]
72 Timeout,
73 #[error(transparent)]
74 Join(#[from] JoinError),
75}
76
77pub(crate) type ArcResult<T> = Result<T, Arc<Error>>;
78
79impl Drop for Connection {
80 fn drop(&mut self) {
81 self.notify_closed(Error::ReceiverClosed);
82 self.should_stop.store(true, Ordering::Relaxed);
83 }
84}
85
86impl Connection {
87 fn try_new(driver: &Driver) -> io::Result<Connection> {
88 let executable = driver.executable();
90 let cli_script = driver.cli_script();
91 let mut child = Command::new(&executable)
92 .arg(&cli_script)
93 .args(&["run-driver"])
94 .stdin(Stdio::piped())
95 .stdout(Stdio::piped())
96 .stderr(Stdio::null())
97 .spawn()?;
98 let stdin = child.stdin.take().unwrap();
100 let stdout = child.stdout.take().unwrap();
101 let reader = Reader::new(stdout);
102 let writer = Writer::new(stdin);
103 let ctx = Context::new(writer);
104 Ok(Self {
105 _child: child,
106 ctx,
107 should_stop: Arc::new(false.into()),
108 reader: Arc::new(Mutex::new(reader)),
109 })
110 }
111
112 pub(crate) fn run(driver: &Driver) -> io::Result<Connection> {
113 let conn = Self::try_new(driver)?;
114
115 {
117 let empty_guid: &S<Guid> = S::validate("").unwrap();
118 let initialize_method: &S<Method> = S::validate("initialize").unwrap();
119 let mut params = Map::new();
120 params.insert(
121 "sdkLanguage".to_string(),
122 Value::String("javascript".to_string()),
123 );
124
125 let req = Req {
126 id: 0,
127 guid: empty_guid,
128 method: initialize_method,
129 params,
130 metadata: Map::new(),
131 };
132
133 let mut ctx = conn.ctx.lock().unwrap();
134 ctx.writer
135 .send(&req)
136 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
137 }
138
139 conn.start();
140 Ok(conn)
141 }
142
143 fn start(&self) {
144 let c2 = Arc::downgrade(&self.ctx);
145 let r2 = Arc::downgrade(&self.reader);
146 let s2 = Arc::downgrade(&self.should_stop);
147 std::thread::spawn(move || {
148 let c = c2;
149 let r = r2;
150 let s = s2;
151 log::trace!("succcess starting connection");
152 let status = (|| -> Result<(), Error> {
153 loop {
154 let response = {
155 let r = match r.upgrade() {
156 Some(x) => x,
157 None => break,
158 };
159 let mut reader = match r.try_lock() {
160 Ok(x) => x,
161 Err(TryLockError::WouldBlock) => continue,
162 Err(e) => Err(e).unwrap(),
163 };
164 match reader.try_read()? {
165 Some(x) => x,
166 None => continue,
167 }
168 };
169 {
170 let s = match s.upgrade() {
171 Some(x) => x,
172 None => break,
173 };
174 let should_stop = s.load(Ordering::Relaxed);
175 if should_stop {
176 break;
177 }
178 }
179 {
181 let c = match c.upgrade() {
182 Some(x) => x,
183 None => break,
184 };
185 let mut ctx = c.lock().unwrap();
186 ctx.dispatch(response)?;
187 }
189 }
190 Ok(())
191 })();
192 if let Err(e) = status {
193 log::trace!("Failed with {:?}", e);
194 if let Some(c) = c.upgrade() {
195 let mut ctx = c.lock().unwrap();
196 ctx.notify_closed(e);
197 }
198 } else {
199 log::trace!("Done");
200 }
201 });
202 }
203
204 pub(crate) fn context(&self) -> Wm<Context> {
205 Arc::downgrade(&self.ctx)
206 }
207
208 fn notify_closed(&mut self, e: Error) {
209 let ctx = &mut self.ctx.lock().unwrap();
210 ctx.notify_closed(e);
211 }
212}
213
214impl Context {
215 fn new(writer: Writer) -> Am<Context> {
216 let objects = {
217 let mut d = HashMap::new();
218 let root = RootObject::new();
219 d.insert(root.guid().to_owned(), RemoteArc::Root(Arc::new(root)));
220 d
221 };
222 let ctx = Context {
223 objects,
224 ctx: Weak::new(),
225 id: 0,
226 callbacks: HashMap::new(),
227 writer,
228 };
229 let am = Arc::new(Mutex::new(ctx));
230 am.lock().unwrap().ctx = Arc::downgrade(&am);
231 am
232 }
233
234 fn notify_closed(&mut self, e: Error) {
235 let err = Arc::new(e);
236 for p in self.callbacks.iter().map(|(_, v)| v) {
237 Context::respond_wait(p, Err(err.clone()));
238 }
239 self.objects = HashMap::new();
240 }
241
242 fn dispatch(&mut self, msg: Res) -> Result<(), Error> {
243 match msg {
244 Res::Result(msg) => {
245 if msg.id == 0 {
247 return Ok(());
248 }
249 let p = self.callbacks.get(&msg.id).ok_or(Error::CallbackNotFound)?;
250 Self::respond_wait(p, Ok(msg.body.map(Arc::new).map_err(Arc::new)));
251 return Ok(());
252 }
253 Res::Initial(msg) => {
254 if Method::is_create(&msg.method) {
255 self.create_remote_object(&msg.guid, msg.params.clone())?;
256 return Ok(());
257 }
258 if Method::is_dispose(&msg.method) {
259 self.dispose(&msg.guid);
260 return Ok(());
261 }
262 let target = self.objects.get(&msg.guid).ok_or(Error::ObjectNotFound)?;
263 let ResInitial { method, params, .. } = msg;
264 target.handle_event(self, method, params)?;
265 }
266 }
267 Ok(())
268 }
269
270 fn dispose(&mut self, i: &S<Guid>) {
271 let a = match self.objects.get(i) {
272 None => return,
273 Some(a) => a,
274 };
275 let cs = a.channel().children();
276 for c in cs {
277 let c = match c.upgrade() {
278 None => continue,
279 Some(c) => c,
280 };
281 self.dispose(&c.channel().guid);
282 }
283 self.remove_object(i);
284 }
285
286 fn respond_wait(
287 WaitPlaces { value, waker }: &WaitPlaces<WaitMessageResult>,
288 result: WaitMessageResult,
289 ) {
290 let place = match value.upgrade() {
291 Some(p) => p,
292 None => return,
293 };
294 let waker = match waker.upgrade() {
295 Some(x) => x,
296 None => return,
297 };
298 *place.lock().unwrap() = Some(result);
299 let waker: &Option<Waker> = &waker.lock().unwrap();
300 let waker = match waker {
301 Some(x) => x.clone(),
302 None => return,
303 };
304 waker.wake();
305 }
306
307 fn create_remote_object(
308 &mut self,
309 parent: &S<Guid>,
310 params: Map<String, Value>,
311 ) -> Result<(), Error> {
312 let CreateParams {
313 typ,
314 guid,
315 initializer,
316 } = serde_json::from_value(params.into())?;
317 let parent = self.objects.get(parent).ok_or(Error::ObjectNotFound)?;
318 let c = ChannelOwner::new(
319 self.ctx.clone(),
320 parent.downgrade(),
321 typ.to_owned(),
322 guid.to_owned(),
323 initializer,
324 );
325 let r = RemoteArc::try_new(&typ, self, c)?;
326 parent.channel().push_child(r.downgrade());
327 self.objects.insert(guid, r.clone());
328 match r {
329 RemoteArc::Page(p) => {
330 p.hook_created(Arc::downgrade(&p))?;
331 }
332 RemoteArc::Frame(f) => {
333 f.hook_created(Arc::downgrade(&f))?;
334 }
335 _ => (),
336 }
337 Ok(())
338 }
339
340 pub(in crate::imp) fn find_object(&self, k: &S<Guid>) -> Option<RemoteWeak> {
341 self.objects.get(k).map(|r| r.downgrade())
342 }
343
344 pub(in crate::imp) fn remove_object(&mut self, k: &S<Guid>) {
345 self.objects.remove(k);
346 }
347
348 pub(in crate::imp::core) fn send_message(&mut self, r: RequestBody) -> Result<(), Error> {
349 self.id += 1;
350 let RequestBody {
351 guid,
352 method,
353 params,
354 place,
355 } = r;
356 self.callbacks.insert(self.id, place);
357 let req = Req {
358 guid: &guid,
359 method: &method,
360 params,
361 id: self.id,
362 metadata: Map::new(),
363 };
364 self.writer.send(&req)?;
365 Ok(())
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use crate::imp::core::*;
372
373 crate::runtime_test!(start, {
374 let driver = Driver::install().unwrap();
375 let conn = Connection::try_new(&driver).unwrap();
376 Connection::start(&conn);
377 });
378}