Skip to main content

stygian_graph/adapters/
wasm_plugin.rs

1//! WASM plugin adapter
2//!
3//! Implements [`WasmPluginPort`](crate::ports::wasm_plugin::WasmPluginPort) using `wasmtime` (feature = `"wasm-plugins"`).
4//!
5//! When the feature is *disabled*, only [`MockWasmPlugin`](wasm_plugin::MockWasmPlugin) is compiled — a
6//! no-wasmtime stand-in that returns canned output, useful for tests.
7//!
8//! # Feature gate
9//!
10//! ```toml
11//! [features]
12//! wasm-plugins = ["dep:wasmtime"]
13//! ```
14
15use crate::domain::error::Result;
16use crate::ports::wasm_plugin::{WasmPluginMeta, WasmPluginPort};
17use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
18use async_trait::async_trait;
19use std::path::PathBuf;
20use std::sync::Arc;
21
22// ─────────────────────────────────────────────────────────────────────────────
23// MockWasmPlugin — always compiled, no wasmtime dep
24// ─────────────────────────────────────────────────────────────────────────────
25
26/// Mock WASM plugin for use in tests.
27///
28/// Returns a predetermined JSON output and never touches the filesystem.
29///
30/// # Example
31///
32/// ```
33/// use stygian_graph::adapters::wasm_plugin::MockWasmPlugin;
34/// use stygian_graph::ports::wasm_plugin::WasmPluginPort;
35/// use std::path::PathBuf;
36///
37/// # tokio_test::block_on(async {
38/// let loader = MockWasmPlugin::new();
39/// let plugins = loader.discover().await.unwrap();
40/// assert_eq!(plugins.len(), 1);
41/// # });
42/// ```
43pub struct MockWasmPlugin {
44    /// Directory the mock will pretend to scan (not used, purely metadata)
45    pub plugin_dir: PathBuf,
46}
47
48impl Default for MockWasmPlugin {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54impl MockWasmPlugin {
55    /// Create a new mock loader.
56    ///
57    /// # Example
58    ///
59    /// ```
60    /// use stygian_graph::adapters::wasm_plugin::MockWasmPlugin;
61    /// let loader = MockWasmPlugin::new();
62    /// ```
63    #[must_use]
64    pub fn new() -> Self {
65        Self {
66            plugin_dir: PathBuf::from("plugins"),
67        }
68    }
69
70    fn mock_meta() -> WasmPluginMeta {
71        WasmPluginMeta {
72            name: "mock-wasm-plugin".to_string(),
73            version: "0.1.0".to_string(),
74            description: "Mock WASM plugin for testing".to_string(),
75            path: PathBuf::from("plugins/mock.wasm"),
76        }
77    }
78}
79
80#[async_trait]
81impl WasmPluginPort for MockWasmPlugin {
82    async fn discover(&self) -> Result<Vec<(WasmPluginMeta, Arc<dyn ScrapingService>)>> {
83        let meta = Self::mock_meta();
84        let svc: Arc<dyn ScrapingService> = Arc::new(MockWasmService::new(meta.name.clone()));
85        Ok(vec![(meta, svc)])
86    }
87
88    async fn load(
89        &self,
90        path: &std::path::Path,
91    ) -> Result<(WasmPluginMeta, Arc<dyn ScrapingService>)> {
92        let meta = WasmPluginMeta {
93            name: path
94                .file_stem()
95                .and_then(|s| s.to_str())
96                .unwrap_or("unknown")
97                .to_string(),
98            version: "0.0.0".to_string(),
99            description: "Mock-loaded plugin".to_string(),
100            path: path.to_path_buf(),
101        };
102        let svc: Arc<dyn ScrapingService> = Arc::new(MockWasmService::new(meta.name.clone()));
103        Ok((meta, svc))
104    }
105
106    async fn loaded(&self) -> Result<Vec<WasmPluginMeta>> {
107        Ok(vec![Self::mock_meta()])
108    }
109}
110
111/// Scraping service backed by the mock WASM plugin.
112struct MockWasmService {
113    /// Leaked name for &'static lifetime compatibility
114    name: &'static str,
115}
116
117impl MockWasmService {
118    fn new(name: String) -> Self {
119        // Leak is intentional in test/mock code: each name is small and
120        // the number of mock instances in a test process is bounded.
121        let leaked: &'static str = Box::leak(name.into_boxed_str());
122        Self { name: leaked }
123    }
124}
125
126#[async_trait]
127impl ScrapingService for MockWasmService {
128    fn name(&self) -> &'static str {
129        self.name
130    }
131
132    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
133        let payload = serde_json::json!({
134            "plugin": self.name,
135            "url": input.url,
136            "status": "mock",
137        });
138        Ok(ServiceOutput {
139            data: payload.to_string(),
140            metadata: serde_json::Value::Null,
141        })
142    }
143}
144
145// ─────────────────────────────────────────────────────────────────────────────
146// WasmPluginLoader — only compiled when feature = "wasm-plugins"
147// ─────────────────────────────────────────────────────────────────────────────
148
149#[cfg(feature = "wasm-plugins")]
150pub use real::WasmPluginLoader;
151
152#[cfg(feature = "wasm-plugins")]
153mod real {
154    //! Real wasmtime-backed WASM plugin loader.
155    //!
156    //! Each `.wasm` file in `plugin_dir` is compiled with a `wasmtime::Engine`
157    //! and cached.  Invoking a plugin passes the serialised [`ServiceInput`] as
158    //! a JSON byte slice through shared linear memory and reads back the
159    //! serialised [`ServiceOutput`].
160
161    use crate::domain::error::{Result, ServiceError, StygianError};
162    use crate::ports::wasm_plugin::{WasmPluginMeta, WasmPluginPort};
163    use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
164    use std::collections::HashMap;
165    use std::path::PathBuf;
166    use std::sync::Arc;
167    use tokio::sync::RwLock;
168    use wasmtime::{Engine, Linker, Module, Store};
169
170    /// Context passed into the WASM store (WASI-compatible host state).
171    pub struct HostState;
172
173    /// Wasmtime-backed WASM plugin loader.
174    ///
175    /// # Example
176    ///
177    /// ```no_run
178    /// use stygian_graph::adapters::wasm_plugin::WasmPluginLoader;
179    /// use stygian_graph::ports::wasm_plugin::WasmPluginPort;
180    /// use std::path::PathBuf;
181    ///
182    /// # tokio_test::block_on(async {
183    /// let loader = WasmPluginLoader::new(PathBuf::from("plugins"));
184    /// let plugins = loader.discover().await.unwrap();
185    /// println!("loaded {} plugins", plugins.len());
186    /// # });
187    /// ```
188    pub struct WasmPluginLoader {
189        plugin_dir: PathBuf,
190        engine: Engine,
191        /// name → (meta, compiled module)
192        cache: RwLock<HashMap<String, (WasmPluginMeta, Module)>>,
193    }
194
195    impl WasmPluginLoader {
196        /// Create a loader that scans `plugin_dir` for `.wasm` files.
197        ///
198        /// # Example
199        ///
200        /// ```no_run
201        /// use stygian_graph::adapters::wasm_plugin::WasmPluginLoader;
202        /// use std::path::PathBuf;
203        ///
204        /// let loader = WasmPluginLoader::new(PathBuf::from("plugins"));
205        /// ```
206        #[must_use]
207        pub fn new(plugin_dir: PathBuf) -> Self {
208            Self {
209                plugin_dir,
210                engine: Engine::default(),
211                cache: RwLock::new(HashMap::new()),
212            }
213        }
214
215        async fn compile_module(&self, path: &std::path::Path) -> Result<(WasmPluginMeta, Module)> {
216            let bytes = tokio::fs::read(path).await.map_err(|e| {
217                StygianError::Service(ServiceError::InvalidResponse(format!(
218                    "failed to read WASM file {}: {e}",
219                    path.display()
220                )))
221            })?;
222
223            let module = Module::from_binary(&self.engine, &bytes).map_err(|e| {
224                StygianError::Service(ServiceError::InvalidResponse(format!(
225                    "failed to compile WASM module {}: {e}",
226                    path.display()
227                )))
228            })?;
229
230            // Extract name from filename (plugins name themselves via export, but
231            // filename is the authoritative ID at load time)
232            let name = path
233                .file_stem()
234                .and_then(|s| s.to_str())
235                .unwrap_or("unknown")
236                .to_string();
237
238            let meta = WasmPluginMeta {
239                name: name.clone(),
240                version: "1.0.0".to_string(),
241                description: format!("WASM plugin: {name}"),
242                path: path.to_path_buf(),
243            };
244
245            Ok((meta, module))
246        }
247    }
248
249    #[async_trait::async_trait]
250    impl WasmPluginPort for WasmPluginLoader {
251        async fn discover(&self) -> Result<Vec<(WasmPluginMeta, Arc<dyn ScrapingService>)>> {
252            let Ok(mut entries) = tokio::fs::read_dir(&self.plugin_dir).await else {
253                return Ok(vec![]); // dir missing → no plugins
254            };
255
256            let mut results = Vec::new();
257            while let Ok(Some(entry)) = entries.next_entry().await {
258                let path = entry.path();
259                if path.extension().and_then(|e| e.to_str()) != Some("wasm") {
260                    continue;
261                }
262                match self.load(&path).await {
263                    Ok(pair) => results.push(pair),
264                    Err(e) => {
265                        tracing::warn!("skipping WASM plugin {}: {e}", path.display());
266                    }
267                }
268            }
269
270            Ok(results)
271        }
272
273        async fn load(
274            &self,
275            path: &std::path::Path,
276        ) -> Result<(WasmPluginMeta, Arc<dyn ScrapingService>)> {
277            let (meta, module) = self.compile_module(path).await?;
278
279            // Cache the compiled module
280            self.cache
281                .write()
282                .await
283                .insert(meta.name.clone(), (meta.clone(), module.clone()));
284
285            let svc: Arc<dyn ScrapingService> = Arc::new(WasmScrapingService {
286                name: Box::leak(meta.name.clone().into_boxed_str()),
287                engine: self.engine.clone(),
288                module,
289            });
290
291            Ok((meta, svc))
292        }
293
294        async fn loaded(&self) -> Result<Vec<WasmPluginMeta>> {
295            let guard = self.cache.read().await;
296            Ok(guard.values().map(|(m, _)| m.clone()).collect())
297        }
298    }
299
300    /// Wraps a compiled WASM module as a [`ScrapingService`].
301    ///
302    /// The module must export:
303    /// - `alloc(size: i32) → i32` — guest allocator
304    /// - `dealloc(ptr: i32, size: i32)` — guest deallocator
305    /// - `plugin_execute(in_ptr: i32, in_len: i32, out_ptr_ptr: i32) → i32`
306    ///   — execute and write output, returning output length
307    struct WasmScrapingService {
308        name: &'static str,
309        engine: Engine,
310        module: Module,
311    }
312
313    #[async_trait::async_trait]
314    impl ScrapingService for WasmScrapingService {
315        fn name(&self) -> &'static str {
316            self.name
317        }
318
319        async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
320            let engine = self.engine.clone();
321            let module = self.module.clone();
322
323            // WASM execution is CPU-bound — run on a blocking thread
324            let result =
325                tokio::task::spawn_blocking(move || execute_wasm_sync(&engine, &module, &input))
326                    .await
327                    .map_err(|e| {
328                        StygianError::Service(ServiceError::InvalidResponse(format!(
329                            "WASM task panicked: {e}"
330                        )))
331                    })??;
332
333            Ok(ServiceOutput {
334                data: result.to_string(),
335                metadata: serde_json::Value::default(),
336            })
337        }
338    }
339
340    #[allow(clippy::too_many_lines)]
341    fn execute_wasm_sync(
342        engine: &Engine,
343        module: &Module,
344        input: &ServiceInput,
345    ) -> Result<serde_json::Value> {
346        let mut store = Store::new(engine, HostState);
347        let linker: Linker<HostState> = Linker::new(engine);
348
349        let instance = linker.instantiate(&mut store, module).map_err(|e| {
350            StygianError::Service(ServiceError::InvalidResponse(format!(
351                "WASM instantiation failed: {e}"
352            )))
353        })?;
354
355        // Serialise input to JSON bytes
356        let input_json = serde_json::to_vec(&serde_json::json!({
357            "url": &input.url,
358            "params": &input.params,
359        }))
360        .map_err(|e| {
361            StygianError::Service(ServiceError::InvalidResponse(format!(
362                "failed to serialise WASM input: {e}"
363            )))
364        })?;
365
366        // Locate exported functions
367        let alloc: wasmtime::TypedFunc<i32, i32> =
368            instance.get_typed_func(&mut store, "alloc").map_err(|e| {
369                StygianError::Service(ServiceError::InvalidResponse(format!(
370                    "WASM missing `alloc` export: {e}"
371                )))
372            })?;
373
374        let execute: wasmtime::TypedFunc<(i32, i32, i32), i32> = instance
375            .get_typed_func(&mut store, "plugin_execute")
376            .map_err(|e| {
377                StygianError::Service(ServiceError::InvalidResponse(format!(
378                    "WASM missing `plugin_execute` export: {e}"
379                )))
380            })?;
381
382        let memory = instance.get_memory(&mut store, "memory").ok_or_else(|| {
383            StygianError::Service(ServiceError::InvalidResponse(
384                "WASM module has no exported `memory`".to_string(),
385            ))
386        })?;
387
388        // Allocate input buffer in guest memory
389        let in_len = i32::try_from(input_json.len()).map_err(|e| {
390            StygianError::Service(ServiceError::InvalidResponse(format!(
391                "WASM input too large: {e}"
392            )))
393        })?;
394        let in_ptr = alloc.call(&mut store, in_len).map_err(|e| {
395            StygianError::Service(ServiceError::InvalidResponse(format!(
396                "WASM alloc failed: {e}"
397            )))
398        })?;
399
400        // Write input bytes into guest memory
401        let in_ptr_usize = usize::try_from(in_ptr).map_err(|e| {
402            StygianError::Service(ServiceError::InvalidResponse(format!(
403                "WASM invalid input pointer: {e}"
404            )))
405        })?;
406        memory
407            .write(&mut store, in_ptr_usize, &input_json)
408            .map_err(|e| {
409                StygianError::Service(ServiceError::InvalidResponse(format!(
410                    "WASM memory write failed: {e}"
411                )))
412            })?;
413
414        // Allocate a 4-byte slot for the output pointer (i32)
415        let out_ptr_slot = alloc.call(&mut store, 4).map_err(|e| {
416            StygianError::Service(ServiceError::InvalidResponse(format!(
417                "WASM alloc (out_ptr) failed: {e}"
418            )))
419        })?;
420
421        // Call plugin_execute
422        let out_len = execute
423            .call(&mut store, (in_ptr, in_len, out_ptr_slot))
424            .map_err(|e| {
425                StygianError::Service(ServiceError::InvalidResponse(format!(
426                    "WASM plugin_execute failed: {e}"
427                )))
428            })?;
429
430        if out_len < 0 {
431            return Err(StygianError::Service(ServiceError::InvalidResponse(
432                format!("WASM plugin_execute returned error code {out_len}"),
433            )));
434        }
435
436        // Read the output pointer value from the slot
437        let mut ptr_bytes = [0u8; 4];
438        let out_ptr_slot_usize = usize::try_from(out_ptr_slot).map_err(|e| {
439            StygianError::Service(ServiceError::InvalidResponse(format!(
440                "WASM invalid output pointer slot: {e}"
441            )))
442        })?;
443        memory
444            .read(&store, out_ptr_slot_usize, &mut ptr_bytes)
445            .map_err(|e| {
446                StygianError::Service(ServiceError::InvalidResponse(format!(
447                    "WASM output ptr read failed: {e}"
448                )))
449            })?;
450        let out_ptr = usize::try_from(i32::from_le_bytes(ptr_bytes)).map_err(|e| {
451            StygianError::Service(ServiceError::InvalidResponse(format!(
452                "WASM invalid output pointer: {e}"
453            )))
454        })?;
455
456        // Read the output bytes
457        let out_len_usize = usize::try_from(out_len).map_err(|e| {
458            StygianError::Service(ServiceError::InvalidResponse(format!(
459                "WASM invalid output length: {e}"
460            )))
461        })?;
462        let mut out_bytes = vec![0u8; out_len_usize];
463        memory.read(&store, out_ptr, &mut out_bytes).map_err(|e| {
464            StygianError::Service(ServiceError::InvalidResponse(format!(
465                "WASM output read failed: {e}"
466            )))
467        })?;
468
469        let value: serde_json::Value = serde_json::from_slice(&out_bytes).map_err(|e| {
470            StygianError::Service(ServiceError::InvalidResponse(format!(
471                "WASM output deserialisation failed: {e}"
472            )))
473        })?;
474
475        Ok(value)
476    }
477}
478
479// ─────────────────────────────────────────────────────────────────────────────
480// Tests
481// ─────────────────────────────────────────────────────────────────────────────
482
483#[cfg(test)]
484#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
485mod tests {
486    use super::MockWasmPlugin;
487    use crate::ports::ServiceInput;
488    use crate::ports::wasm_plugin::WasmPluginPort;
489    use std::path::PathBuf;
490
491    #[tokio::test]
492    async fn mock_loader_discover_returns_one_plugin() {
493        let loader = MockWasmPlugin::new();
494        let plugins = loader.discover().await.unwrap();
495        assert_eq!(plugins.len(), 1);
496        assert_eq!(plugins[0].0.name, "mock-wasm-plugin");
497    }
498
499    #[tokio::test]
500    async fn mock_loader_loaded_returns_meta() {
501        let loader = MockWasmPlugin::new();
502        let metas = loader.loaded().await.unwrap();
503        assert_eq!(metas.len(), 1);
504        assert_eq!(metas[0].version, "0.1.0");
505    }
506
507    #[tokio::test]
508    async fn mock_loader_load_by_path() {
509        let loader = MockWasmPlugin::new();
510        let (meta, svc) = loader
511            .load(&PathBuf::from("plugins/example.wasm"))
512            .await
513            .unwrap();
514        assert_eq!(meta.name, "example");
515        let input = ServiceInput {
516            url: "https://example.com".to_string(),
517            params: serde_json::Value::Null,
518        };
519        let output = svc.execute(input).await.unwrap();
520        assert!(output.data.contains("example"));
521        assert!(output.data.contains("mock"));
522    }
523
524    #[tokio::test]
525    async fn mock_service_returns_plugin_name_in_output() {
526        let loader = MockWasmPlugin::new();
527        let (_, svc) = loader.discover().await.unwrap().remove(0);
528        let input = ServiceInput {
529            url: "https://test.com".to_string(),
530            params: serde_json::Value::Null,
531        };
532        let out = svc.execute(input).await.unwrap();
533        assert!(out.data.contains("mock-wasm-plugin"));
534        assert!(out.data.contains("https://test.com"));
535    }
536
537    #[test]
538    fn mock_plugin_default_dir_is_plugins() {
539        let loader = MockWasmPlugin::default();
540        assert_eq!(loader.plugin_dir, PathBuf::from("plugins"));
541    }
542
543    #[tokio::test]
544    async fn mock_service_execute_with_json_params() {
545        let loader = MockWasmPlugin::new();
546        let (_, svc) = loader.discover().await.unwrap().remove(0);
547        let input = ServiceInput {
548            url: "https://api.example.com/data".to_string(),
549            params: serde_json::json!({"key": "value"}),
550        };
551        let out = svc.execute(input).await.unwrap();
552        // Mock service always succeeds regardless of params
553        assert!(!out.data.is_empty());
554        assert!(out.data.contains("https://api.example.com/data"));
555    }
556
557    #[tokio::test]
558    async fn mock_loader_load_uses_file_stem_as_name() {
559        let loader = MockWasmPlugin::new();
560        let (meta, _) = loader
561            .load(&PathBuf::from("plugins/my-extractor.wasm"))
562            .await
563            .unwrap();
564        assert_eq!(meta.name, "my-extractor");
565    }
566}