Skip to main content

stygian_graph/adapters/
database.rs

1//! Database source adapter — queries PostgreSQL as a pipeline data source.
2//!
3//! Implements [`DataSourcePort`](crate::ports::data_source::DataSourcePort) and [`ScrapingService`](crate::ports::ScrapingService) so database queries
4//! can participate in a DAG pipeline as a first-class node.
5//!
6//! Requires the `postgres` feature flag (`sqlx` dependency).
7//!
8//! # Example
9//!
10//! ```no_run
11//! use stygian_graph::adapters::database::DatabaseSource;
12//! use stygian_graph::ports::data_source::{DataSourcePort, QueryParams};
13//! use serde_json::json;
14//!
15//! # async fn example() {
16//! let db = DatabaseSource::new("postgres://user:pass@localhost/mydb").await.unwrap();
17//! let rows = db.query(QueryParams {
18//!     query: "SELECT id, name FROM users LIMIT 10".into(),
19//!     parameters: vec![],
20//!     limit: Some(10),
21//! }).await.unwrap();
22//! # }
23//! ```
24
25use async_trait::async_trait;
26use serde_json::{Value, json};
27use sqlx::postgres::PgPoolOptions;
28use sqlx::{Column, PgPool, Row};
29
30use crate::domain::error::{Result, ServiceError, StygianError};
31use crate::ports::data_source::{DataSourcePort, QueryParams};
32use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
33
34// ─────────────────────────────────────────────────────────────────────────────
35// DatabaseSource
36// ─────────────────────────────────────────────────────────────────────────────
37
38/// Adapter: `PostgreSQL` database as a pipeline data source.
39///
40/// Wraps a `sqlx::PgPool` and implements both [`DataSourcePort`] (for direct
41/// querying) and [`ScrapingService`] (for DAG pipeline integration).
42///
43/// # Example
44///
45/// ```no_run
46/// use stygian_graph::adapters::database::DatabaseSource;
47///
48/// # async fn example() {
49/// let db = DatabaseSource::new("postgres://localhost/testdb").await.unwrap();
50/// println!("Connected to: {}", db.source_name());
51/// # }
52/// ```
53pub struct DatabaseSource {
54    pool: PgPool,
55    name: String,
56}
57
58impl DatabaseSource {
59    /// Connect to a `PostgreSQL` database.
60    ///
61    /// # Arguments
62    ///
63    /// * `database_url` - `PostgreSQL` connection string
64    ///
65    /// # Example
66    ///
67    /// ```no_run
68    /// use stygian_graph::adapters::database::DatabaseSource;
69    ///
70    /// # async fn example() {
71    /// let db = DatabaseSource::new("postgres://localhost/mydb").await.unwrap();
72    /// # }
73    /// ```
74    pub async fn new(database_url: &str) -> Result<Self> {
75        let pool = PgPoolOptions::new()
76            .max_connections(5)
77            .connect(database_url)
78            .await
79            .map_err(|e| {
80                StygianError::Service(ServiceError::Unavailable(format!(
81                    "database connection failed: {e}"
82                )))
83            })?;
84
85        Ok(Self {
86            pool,
87            name: "postgres".to_string(),
88        })
89    }
90
91    /// Create from an existing pool (useful for testing or shared connections).
92    ///
93    /// # Example
94    ///
95    /// ```no_run
96    /// use stygian_graph::adapters::database::DatabaseSource;
97    /// use sqlx::PgPool;
98    ///
99    /// # async fn example(pool: PgPool) {
100    /// let db = DatabaseSource::from_pool(pool);
101    /// # }
102    /// ```
103    #[must_use]
104    pub fn from_pool(pool: PgPool) -> Self {
105        Self {
106            pool,
107            name: "postgres".to_string(),
108        }
109    }
110
111    /// Return the name of this source for display/logging.
112    #[must_use]
113    pub fn source_name(&self) -> &str {
114        &self.name
115    }
116}
117
118// ─────────────────────────────────────────────────────────────────────────────
119// DataSourcePort
120// ─────────────────────────────────────────────────────────────────────────────
121
122#[async_trait]
123impl DataSourcePort for DatabaseSource {
124    async fn query(&self, params: QueryParams) -> Result<Vec<Value>> {
125        let rows: Vec<sqlx::postgres::PgRow> = sqlx::query(&params.query)
126            .fetch_all(&self.pool)
127            .await
128            .map_err(|e| {
129                StygianError::Service(ServiceError::InvalidResponse(format!("query failed: {e}")))
130            })?;
131
132        let mut results = Vec::with_capacity(rows.len());
133        for row in &rows {
134            // Convert each row to a JSON object using column metadata
135            let columns = row.columns();
136            let mut obj = serde_json::Map::new();
137            for col in columns {
138                let name = col.name().to_string();
139                let value: Value = Self::extract_column_value(row, col);
140                obj.insert(name, value);
141            }
142            results.push(Value::Object(obj));
143
144            if let Some(limit) = params.limit
145                && results.len() as u64 >= limit
146            {
147                break;
148            }
149        }
150
151        Ok(results)
152    }
153
154    async fn healthcheck(&self) -> Result<()> {
155        sqlx::query("SELECT 1")
156            .fetch_one(&self.pool)
157            .await
158            .map_err(|e| {
159                StygianError::Service(ServiceError::Unavailable(format!(
160                    "healthcheck failed: {e}"
161                )))
162            })?;
163        Ok(())
164    }
165
166    fn source_name(&self) -> &str {
167        &self.name
168    }
169}
170
171impl DatabaseSource {
172    /// Extract a column value from a `PgRow` as a `serde_json::Value`.
173    ///
174    /// Handles common Postgres types; falls back to the debug representation
175    /// for unsupported types.
176    fn extract_column_value(row: &sqlx::postgres::PgRow, col: &sqlx::postgres::PgColumn) -> Value {
177        use sqlx::Column;
178        use sqlx::TypeInfo;
179
180        let type_name = col.type_info().name();
181        let idx = col.ordinal();
182
183        match type_name {
184            "INT4" | "INT2" => row.try_get::<i32, _>(idx).map_or(Value::Null, |v| json!(v)),
185            "INT8" => row.try_get::<i64, _>(idx).map_or(Value::Null, |v| json!(v)),
186            "FLOAT4" => row.try_get::<f32, _>(idx).map_or(Value::Null, |v| json!(v)),
187            "FLOAT8" | "NUMERIC" => row.try_get::<f64, _>(idx).map_or(Value::Null, |v| json!(v)),
188            "BOOL" => row
189                .try_get::<bool, _>(idx)
190                .map_or(Value::Null, |v| json!(v)),
191            "TEXT" | "VARCHAR" | "CHAR" | "NAME" => row
192                .try_get::<String, _>(idx)
193                .map_or(Value::Null, |v| json!(v)),
194            "JSON" | "JSONB" => row.try_get::<Value, _>(idx).unwrap_or(Value::Null),
195            _ => row
196                .try_get::<String, _>(idx)
197                .map_or(Value::Null, |v| json!(v)),
198        }
199    }
200}
201
202// ─────────────────────────────────────────────────────────────────────────────
203// ScrapingService (DAG integration)
204// ─────────────────────────────────────────────────────────────────────────────
205
206#[async_trait]
207impl ScrapingService for DatabaseSource {
208    /// Execute a database query from pipeline parameters.
209    ///
210    /// Expected params:
211    /// ```json
212    /// { "query": "SELECT ...", "parameters": [...], "limit": 100 }
213    /// ```
214    ///
215    /// # Example
216    ///
217    /// ```no_run
218    /// # use stygian_graph::ports::{ScrapingService, ServiceInput};
219    /// # use stygian_graph::adapters::database::DatabaseSource;
220    /// # use serde_json::json;
221    /// # async fn example(db: DatabaseSource) {
222    /// let input = ServiceInput {
223    ///     url: String::new(),
224    ///     params: json!({"query": "SELECT 1 AS n", "parameters": [], "limit": 10}),
225    /// };
226    /// let result = db.execute(input).await.unwrap();
227    /// # }
228    /// ```
229    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
230        let query_str = input
231            .params
232            .get("query")
233            .and_then(|v| v.as_str())
234            .ok_or_else(|| {
235                StygianError::Service(ServiceError::InvalidResponse(
236                    "missing 'query' in params".into(),
237                ))
238            })?
239            .to_string();
240
241        let parameters: Vec<Value> = input
242            .params
243            .get("parameters")
244            .and_then(|v| v.as_array())
245            .cloned()
246            .unwrap_or_default();
247
248        let limit = input.params.get("limit").and_then(Value::as_u64);
249
250        let params = QueryParams {
251            query: query_str,
252            parameters,
253            limit,
254        };
255
256        let rows = self.query(params).await?;
257        let row_count = rows.len();
258
259        Ok(ServiceOutput {
260            data: serde_json::to_string(&rows).unwrap_or_default(),
261            metadata: json!({
262                "source": self.name,
263                "row_count": row_count,
264            }),
265        })
266    }
267
268    fn name(&self) -> &'static str {
269        "database"
270    }
271}