stygian_graph/adapters/
wasm_plugin.rs

1//! WASM plugin adapter
2//!
3//! Implements [`WasmPluginPort`](crate::ports::wasm_plugin::WasmPluginPort) using `wasmtime` (feature = `"wasm-plugins"`).
4//!
5//! When the feature is *disabled*, only [`MockWasmPlugin`](wasm_plugin::MockWasmPlugin) is compiled — a
6//! no-wasmtime stand-in that returns canned output, useful for tests.
7//!
8//! # Feature gate
9//!
10//! ```toml
11//! [features]
12//! wasm-plugins = ["dep:wasmtime"]
13//! ```
14
15use crate::domain::error::Result;
16use crate::ports::wasm_plugin::{WasmPluginMeta, WasmPluginPort};
17use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
18use async_trait::async_trait;
19use std::path::PathBuf;
20use std::sync::Arc;
21
22// ─────────────────────────────────────────────────────────────────────────────
23// MockWasmPlugin — always compiled, no wasmtime dep
24// ─────────────────────────────────────────────────────────────────────────────
25
26/// Mock WASM plugin for use in tests.
27///
28/// Returns a predetermined JSON output and never touches the filesystem.
29///
30/// # Example
31///
32/// ```
33/// use stygian_graph::adapters::wasm_plugin::MockWasmPlugin;
34/// use stygian_graph::ports::wasm_plugin::WasmPluginPort;
35/// use std::path::PathBuf;
36///
37/// # tokio_test::block_on(async {
38/// let loader = MockWasmPlugin::new();
39/// let plugins = loader.discover().await.unwrap();
40/// assert_eq!(plugins.len(), 1);
41/// # });
42/// ```
43pub struct MockWasmPlugin {
44    /// Directory the mock will pretend to scan (not used, purely metadata)
45    pub plugin_dir: PathBuf,
46}
47
48impl Default for MockWasmPlugin {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54impl MockWasmPlugin {
55    /// Create a new mock loader.
56    ///
57    /// # Example
58    ///
59    /// ```
60    /// use stygian_graph::adapters::wasm_plugin::MockWasmPlugin;
61    /// let loader = MockWasmPlugin::new();
62    /// ```
63    pub fn new() -> Self {
64        Self {
65            plugin_dir: PathBuf::from("plugins"),
66        }
67    }
68
69    fn mock_meta() -> WasmPluginMeta {
70        WasmPluginMeta {
71            name: "mock-wasm-plugin".to_string(),
72            version: "0.1.0".to_string(),
73            description: "Mock WASM plugin for testing".to_string(),
74            path: PathBuf::from("plugins/mock.wasm"),
75        }
76    }
77}
78
79#[async_trait]
80impl WasmPluginPort for MockWasmPlugin {
81    async fn discover(&self) -> Result<Vec<(WasmPluginMeta, Arc<dyn ScrapingService>)>> {
82        let meta = Self::mock_meta();
83        let svc: Arc<dyn ScrapingService> = Arc::new(MockWasmService::new(meta.name.clone()));
84        Ok(vec![(meta, svc)])
85    }
86
87    async fn load(
88        &self,
89        path: &std::path::Path,
90    ) -> Result<(WasmPluginMeta, Arc<dyn ScrapingService>)> {
91        let meta = WasmPluginMeta {
92            name: path
93                .file_stem()
94                .and_then(|s| s.to_str())
95                .unwrap_or("unknown")
96                .to_string(),
97            version: "0.0.0".to_string(),
98            description: "Mock-loaded plugin".to_string(),
99            path: path.to_path_buf(),
100        };
101        let svc: Arc<dyn ScrapingService> = Arc::new(MockWasmService::new(meta.name.clone()));
102        Ok((meta, svc))
103    }
104
105    async fn loaded(&self) -> Result<Vec<WasmPluginMeta>> {
106        Ok(vec![Self::mock_meta()])
107    }
108}
109
110/// Scraping service backed by the mock WASM plugin.
111struct MockWasmService {
112    /// Leaked name for &'static lifetime compatibility
113    name: &'static str,
114}
115
116impl MockWasmService {
117    fn new(name: String) -> Self {
118        // Leak is intentional in test/mock code: each name is small and
119        // the number of mock instances in a test process is bounded.
120        let leaked: &'static str = Box::leak(name.into_boxed_str());
121        Self { name: leaked }
122    }
123}
124
125#[async_trait]
126impl ScrapingService for MockWasmService {
127    fn name(&self) -> &'static str {
128        self.name
129    }
130
131    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
132        let payload = serde_json::json!({
133            "plugin": self.name,
134            "url": input.url,
135            "status": "mock",
136        });
137        Ok(ServiceOutput {
138            data: payload.to_string(),
139            metadata: serde_json::Value::Null,
140        })
141    }
142}
143
144// ─────────────────────────────────────────────────────────────────────────────
145// WasmPluginLoader — only compiled when feature = "wasm-plugins"
146// ─────────────────────────────────────────────────────────────────────────────
147
148#[cfg(feature = "wasm-plugins")]
149pub use real::WasmPluginLoader;
150
151#[cfg(feature = "wasm-plugins")]
152mod real {
153    //! Real wasmtime-backed WASM plugin loader.
154    //!
155    //! Each `.wasm` file in `plugin_dir` is compiled with a `wasmtime::Engine`
156    //! and cached.  Invoking a plugin passes the serialised [`ServiceInput`] as
157    //! a JSON byte slice through shared linear memory and reads back the
158    //! serialised [`ServiceOutput`].
159
160    use crate::domain::error::{Result, ServiceError, StygianError};
161    use crate::ports::wasm_plugin::{WasmPluginMeta, WasmPluginPort};
162    use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
163    use std::collections::HashMap;
164    use std::path::PathBuf;
165    use std::sync::Arc;
166    use tokio::sync::RwLock;
167    use wasmtime::{Engine, Linker, Module, Store};
168
169    /// Context passed into the WASM store (WASI-compatible host state).
170    pub struct HostState;
171
172    /// Wasmtime-backed WASM plugin loader.
173    ///
174    /// # Example
175    ///
176    /// ```no_run
177    /// use stygian_graph::adapters::wasm_plugin::WasmPluginLoader;
178    /// use stygian_graph::ports::wasm_plugin::WasmPluginPort;
179    /// use std::path::PathBuf;
180    ///
181    /// # tokio_test::block_on(async {
182    /// let loader = WasmPluginLoader::new(PathBuf::from("plugins"));
183    /// let plugins = loader.discover().await.unwrap();
184    /// println!("loaded {} plugins", plugins.len());
185    /// # });
186    /// ```
187    pub struct WasmPluginLoader {
188        plugin_dir: PathBuf,
189        engine: Engine,
190        /// name → (meta, compiled module)
191        cache: RwLock<HashMap<String, (WasmPluginMeta, Module)>>,
192    }
193
194    impl WasmPluginLoader {
195        /// Create a loader that scans `plugin_dir` for `.wasm` files.
196        ///
197        /// # Example
198        ///
199        /// ```no_run
200        /// use stygian_graph::adapters::wasm_plugin::WasmPluginLoader;
201        /// use std::path::PathBuf;
202        ///
203        /// let loader = WasmPluginLoader::new(PathBuf::from("plugins"));
204        /// ```
205        pub fn new(plugin_dir: PathBuf) -> Self {
206            Self {
207                plugin_dir,
208                engine: Engine::default(),
209                cache: RwLock::new(HashMap::new()),
210            }
211        }
212
213        async fn compile_module(&self, path: &std::path::Path) -> Result<(WasmPluginMeta, Module)> {
214            let bytes = tokio::fs::read(path).await.map_err(|e| {
215                StygianError::Service(ServiceError::InvalidResponse(format!(
216                    "failed to read WASM file {}: {e}",
217                    path.display()
218                )))
219            })?;
220
221            let module = Module::from_binary(&self.engine, &bytes).map_err(|e| {
222                StygianError::Service(ServiceError::InvalidResponse(format!(
223                    "failed to compile WASM module {}: {e}",
224                    path.display()
225                )))
226            })?;
227
228            // Extract name from filename (plugins name themselves via export, but
229            // filename is the authoritative ID at load time)
230            let name = path
231                .file_stem()
232                .and_then(|s| s.to_str())
233                .unwrap_or("unknown")
234                .to_string();
235
236            let meta = WasmPluginMeta {
237                name: name.clone(),
238                version: "1.0.0".to_string(),
239                description: format!("WASM plugin: {name}"),
240                path: path.to_path_buf(),
241            };
242
243            Ok((meta, module))
244        }
245    }
246
247    #[async_trait::async_trait]
248    impl WasmPluginPort for WasmPluginLoader {
249        async fn discover(&self) -> Result<Vec<(WasmPluginMeta, Arc<dyn ScrapingService>)>> {
250            let Ok(mut entries) = tokio::fs::read_dir(&self.plugin_dir).await else {
251                return Ok(vec![]); // dir missing → no plugins
252            };
253
254            let mut results = Vec::new();
255            while let Ok(Some(entry)) = entries.next_entry().await {
256                let path = entry.path();
257                if path.extension().and_then(|e| e.to_str()) != Some("wasm") {
258                    continue;
259                }
260                match self.load(&path).await {
261                    Ok(pair) => results.push(pair),
262                    Err(e) => {
263                        tracing::warn!("skipping WASM plugin {}: {e}", path.display());
264                    }
265                }
266            }
267
268            Ok(results)
269        }
270
271        async fn load(
272            &self,
273            path: &std::path::Path,
274        ) -> Result<(WasmPluginMeta, Arc<dyn ScrapingService>)> {
275            let (meta, module) = self.compile_module(path).await?;
276
277            // Cache the compiled module
278            self.cache
279                .write()
280                .await
281                .insert(meta.name.clone(), (meta.clone(), module.clone()));
282
283            let svc: Arc<dyn ScrapingService> = Arc::new(WasmScrapingService {
284                name: Box::leak(meta.name.clone().into_boxed_str()),
285                engine: self.engine.clone(),
286                module,
287            });
288
289            Ok((meta, svc))
290        }
291
292        async fn loaded(&self) -> Result<Vec<WasmPluginMeta>> {
293            let guard = self.cache.read().await;
294            Ok(guard.values().map(|(m, _)| m.clone()).collect())
295        }
296    }
297
298    /// Wraps a compiled WASM module as a [`ScrapingService`].
299    ///
300    /// The module must export:
301    /// - `alloc(size: i32) → i32` — guest allocator
302    /// - `dealloc(ptr: i32, size: i32)` — guest deallocator
303    /// - `plugin_execute(in_ptr: i32, in_len: i32, out_ptr_ptr: i32) → i32`
304    ///   — execute and write output, returning output length
305    struct WasmScrapingService {
306        name: &'static str,
307        engine: Engine,
308        module: Module,
309    }
310
311    #[async_trait::async_trait]
312    impl ScrapingService for WasmScrapingService {
313        fn name(&self) -> &'static str {
314            self.name
315        }
316
317        async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
318            let engine = self.engine.clone();
319            let module = self.module.clone();
320
321            // WASM execution is CPU-bound — run on a blocking thread
322            let result =
323                tokio::task::spawn_blocking(move || execute_wasm_sync(&engine, &module, &input))
324                    .await
325                    .map_err(|e| {
326                        StygianError::Service(ServiceError::InvalidResponse(format!(
327                            "WASM task panicked: {e}"
328                        )))
329                    })??;
330
331            Ok(ServiceOutput {
332                data: result.to_string(),
333                metadata: serde_json::Value::default(),
334            })
335        }
336    }
337
338    #[allow(clippy::too_many_lines)]
339    fn execute_wasm_sync(
340        engine: &Engine,
341        module: &Module,
342        input: &ServiceInput,
343    ) -> Result<serde_json::Value> {
344        let mut store = Store::new(engine, HostState);
345        let linker: Linker<HostState> = Linker::new(engine);
346
347        let instance = linker.instantiate(&mut store, module).map_err(|e| {
348            StygianError::Service(ServiceError::InvalidResponse(format!(
349                "WASM instantiation failed: {e}"
350            )))
351        })?;
352
353        // Serialise input to JSON bytes
354        let input_json = serde_json::to_vec(&serde_json::json!({
355            "url": &input.url,
356            "params": &input.params,
357        }))
358        .map_err(|e| {
359            StygianError::Service(ServiceError::InvalidResponse(format!(
360                "failed to serialise WASM input: {e}"
361            )))
362        })?;
363
364        // Locate exported functions
365        let alloc: wasmtime::TypedFunc<i32, i32> =
366            instance.get_typed_func(&mut store, "alloc").map_err(|e| {
367                StygianError::Service(ServiceError::InvalidResponse(format!(
368                    "WASM missing `alloc` export: {e}"
369                )))
370            })?;
371
372        let execute: wasmtime::TypedFunc<(i32, i32, i32), i32> = instance
373            .get_typed_func(&mut store, "plugin_execute")
374            .map_err(|e| {
375                StygianError::Service(ServiceError::InvalidResponse(format!(
376                    "WASM missing `plugin_execute` export: {e}"
377                )))
378            })?;
379
380        let memory = instance.get_memory(&mut store, "memory").ok_or_else(|| {
381            StygianError::Service(ServiceError::InvalidResponse(
382                "WASM module has no exported `memory`".to_string(),
383            ))
384        })?;
385
386        // Allocate input buffer in guest memory
387        let in_len = i32::try_from(input_json.len()).map_err(|e| {
388            StygianError::Service(ServiceError::InvalidResponse(format!(
389                "WASM input too large: {e}"
390            )))
391        })?;
392        let in_ptr = alloc.call(&mut store, in_len).map_err(|e| {
393            StygianError::Service(ServiceError::InvalidResponse(format!(
394                "WASM alloc failed: {e}"
395            )))
396        })?;
397
398        // Write input bytes into guest memory
399        let in_ptr_usize = usize::try_from(in_ptr).map_err(|e| {
400            StygianError::Service(ServiceError::InvalidResponse(format!(
401                "WASM invalid input pointer: {e}"
402            )))
403        })?;
404        memory
405            .write(&mut store, in_ptr_usize, &input_json)
406            .map_err(|e| {
407                StygianError::Service(ServiceError::InvalidResponse(format!(
408                    "WASM memory write failed: {e}"
409                )))
410            })?;
411
412        // Allocate a 4-byte slot for the output pointer (i32)
413        let out_ptr_slot = alloc.call(&mut store, 4).map_err(|e| {
414            StygianError::Service(ServiceError::InvalidResponse(format!(
415                "WASM alloc (out_ptr) failed: {e}"
416            )))
417        })?;
418
419        // Call plugin_execute
420        let out_len = execute
421            .call(&mut store, (in_ptr, in_len, out_ptr_slot))
422            .map_err(|e| {
423                StygianError::Service(ServiceError::InvalidResponse(format!(
424                    "WASM plugin_execute failed: {e}"
425                )))
426            })?;
427
428        if out_len < 0 {
429            return Err(StygianError::Service(ServiceError::InvalidResponse(
430                format!("WASM plugin_execute returned error code {out_len}"),
431            )));
432        }
433
434        // Read the output pointer value from the slot
435        let mut ptr_bytes = [0u8; 4];
436        let out_ptr_slot_usize = usize::try_from(out_ptr_slot).map_err(|e| {
437            StygianError::Service(ServiceError::InvalidResponse(format!(
438                "WASM invalid output pointer slot: {e}"
439            )))
440        })?;
441        memory
442            .read(&store, out_ptr_slot_usize, &mut ptr_bytes)
443            .map_err(|e| {
444                StygianError::Service(ServiceError::InvalidResponse(format!(
445                    "WASM output ptr read failed: {e}"
446                )))
447            })?;
448        let out_ptr = usize::try_from(i32::from_le_bytes(ptr_bytes)).map_err(|e| {
449            StygianError::Service(ServiceError::InvalidResponse(format!(
450                "WASM invalid output pointer: {e}"
451            )))
452        })?;
453
454        // Read the output bytes
455        let out_len_usize = usize::try_from(out_len).map_err(|e| {
456            StygianError::Service(ServiceError::InvalidResponse(format!(
457                "WASM invalid output length: {e}"
458            )))
459        })?;
460        let mut out_bytes = vec![0u8; out_len_usize];
461        memory.read(&store, out_ptr, &mut out_bytes).map_err(|e| {
462            StygianError::Service(ServiceError::InvalidResponse(format!(
463                "WASM output read failed: {e}"
464            )))
465        })?;
466
467        let value: serde_json::Value = serde_json::from_slice(&out_bytes).map_err(|e| {
468            StygianError::Service(ServiceError::InvalidResponse(format!(
469                "WASM output deserialisation failed: {e}"
470            )))
471        })?;
472
473        Ok(value)
474    }
475}
476
477// ─────────────────────────────────────────────────────────────────────────────
478// Tests
479// ─────────────────────────────────────────────────────────────────────────────
480
481#[cfg(test)]
482#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
483mod tests {
484    use super::MockWasmPlugin;
485    use crate::ports::ServiceInput;
486    use crate::ports::wasm_plugin::WasmPluginPort;
487    use std::path::PathBuf;
488
489    #[tokio::test]
490    async fn mock_loader_discover_returns_one_plugin() {
491        let loader = MockWasmPlugin::new();
492        let plugins = loader.discover().await.unwrap();
493        assert_eq!(plugins.len(), 1);
494        assert_eq!(plugins[0].0.name, "mock-wasm-plugin");
495    }
496
497    #[tokio::test]
498    async fn mock_loader_loaded_returns_meta() {
499        let loader = MockWasmPlugin::new();
500        let metas = loader.loaded().await.unwrap();
501        assert_eq!(metas.len(), 1);
502        assert_eq!(metas[0].version, "0.1.0");
503    }
504
505    #[tokio::test]
506    async fn mock_loader_load_by_path() {
507        let loader = MockWasmPlugin::new();
508        let (meta, svc) = loader
509            .load(&PathBuf::from("plugins/example.wasm"))
510            .await
511            .unwrap();
512        assert_eq!(meta.name, "example");
513        let input = ServiceInput {
514            url: "https://example.com".to_string(),
515            params: serde_json::Value::Null,
516        };
517        let output = svc.execute(input).await.unwrap();
518        assert!(output.data.contains("example"));
519        assert!(output.data.contains("mock"));
520    }
521
522    #[tokio::test]
523    async fn mock_service_returns_plugin_name_in_output() {
524        let loader = MockWasmPlugin::new();
525        let (_, svc) = loader.discover().await.unwrap().remove(0);
526        let input = ServiceInput {
527            url: "https://test.com".to_string(),
528            params: serde_json::Value::Null,
529        };
530        let out = svc.execute(input).await.unwrap();
531        assert!(out.data.contains("mock-wasm-plugin"));
532        assert!(out.data.contains("https://test.com"));
533    }
534
535    #[test]
536    fn mock_plugin_default_dir_is_plugins() {
537        let loader = MockWasmPlugin::default();
538        assert_eq!(loader.plugin_dir, PathBuf::from("plugins"));
539    }
540
541    #[tokio::test]
542    async fn mock_service_execute_with_json_params() {
543        let loader = MockWasmPlugin::new();
544        let (_, svc) = loader.discover().await.unwrap().remove(0);
545        let input = ServiceInput {
546            url: "https://api.example.com/data".to_string(),
547            params: serde_json::json!({"key": "value"}),
548        };
549        let out = svc.execute(input).await.unwrap();
550        // Mock service always succeeds regardless of params
551        assert!(!out.data.is_empty());
552        assert!(out.data.contains("https://api.example.com/data"));
553    }
554
555    #[tokio::test]
556    async fn mock_loader_load_uses_file_stem_as_name() {
557        let loader = MockWasmPlugin::new();
558        let (meta, _) = loader
559            .load(&PathBuf::from("plugins/my-extractor.wasm"))
560            .await
561            .unwrap();
562        assert_eq!(meta.name, "my-extractor");
563    }
564}