Skip to main content

stygian_graph/adapters/
database.rs

1//! Database source adapter — queries PostgreSQL as a pipeline data source.
2//!
3//! Implements [`DataSourcePort`](crate::ports::data_source::DataSourcePort) and [`ScrapingService`](crate::ports::ScrapingService) so database queries
4//! can participate in a DAG pipeline as a first-class node.
5//!
6//! Requires the `postgres` feature flag (`sqlx` dependency).
7//!
8//! # Example
9//!
10//! ```no_run
11//! use stygian_graph::adapters::database::DatabaseSource;
12//! use stygian_graph::ports::data_source::{DataSourcePort, QueryParams};
13//! use serde_json::json;
14//!
15//! # async fn example() {
16//! let db = DatabaseSource::new("postgres://user:pass@localhost/mydb").await.unwrap();
17//! let rows = db.query(QueryParams {
18//!     query: "SELECT id, name FROM users LIMIT 10".into(),
19//!     parameters: vec![],
20//!     limit: Some(10),
21//! }).await.unwrap();
22//! # }
23//! ```
24
25use async_trait::async_trait;
26use serde_json::{Value, json};
27use sqlx::postgres::PgPoolOptions;
28use sqlx::{Column, PgPool, Row};
29
30use crate::domain::error::{Result, ServiceError, StygianError};
31use crate::ports::data_source::{DataSourcePort, QueryParams};
32use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
33
34// ─────────────────────────────────────────────────────────────────────────────
35// DatabaseSource
36// ─────────────────────────────────────────────────────────────────────────────
37
38/// Adapter: `PostgreSQL` database as a pipeline data source.
39///
40/// Wraps a `sqlx::PgPool` and implements both [`DataSourcePort`] (for direct
41/// querying) and [`ScrapingService`] (for DAG pipeline integration).
42///
43/// # Example
44///
45/// ```no_run
46/// use stygian_graph::adapters::database::DatabaseSource;
47///
48/// # async fn example() {
49/// let db = DatabaseSource::new("postgres://localhost/testdb").await.unwrap();
50/// println!("Connected to: {}", db.source_name());
51/// # }
52/// ```
53pub struct DatabaseSource {
54    pool: PgPool,
55    name: String,
56}
57
58impl DatabaseSource {
59    /// Connect to a `PostgreSQL` database.
60    ///
61    /// # Arguments
62    ///
63    /// * `database_url` - `PostgreSQL` connection string
64    ///
65    /// # Example
66    ///
67    /// ```no_run
68    /// use stygian_graph::adapters::database::DatabaseSource;
69    ///
70    /// # async fn example() {
71    /// let db = DatabaseSource::new("postgres://localhost/mydb").await.unwrap();
72    /// # }
73    /// ```
74    ///
75    /// # Errors
76    ///
77    /// Returns [`StygianError`] wrapping `ServiceError::Unavailable` when the
78    /// supplied connection URL is malformed or the database is unreachable.
79    pub async fn new(database_url: &str) -> Result<Self> {
80        let pool = PgPoolOptions::new()
81            .max_connections(5)
82            .connect(database_url)
83            .await
84            .map_err(|e| {
85                StygianError::Service(ServiceError::Unavailable(format!(
86                    "database connection failed: {e}"
87                )))
88            })?;
89
90        Ok(Self {
91            pool,
92            name: "postgres".to_string(),
93        })
94    }
95
96    /// Create from an existing pool (useful for testing or shared connections).
97    ///
98    /// # Example
99    ///
100    /// ```no_run
101    /// use stygian_graph::adapters::database::DatabaseSource;
102    /// use sqlx::PgPool;
103    ///
104    /// # async fn example(pool: PgPool) {
105    /// let db = DatabaseSource::from_pool(pool);
106    /// # }
107    /// ```
108    #[must_use]
109    pub fn from_pool(pool: PgPool) -> Self {
110        Self {
111            pool,
112            name: "postgres".to_string(),
113        }
114    }
115
116    /// Return the name of this source for display/logging.
117    #[must_use]
118    pub fn source_name(&self) -> &str {
119        &self.name
120    }
121}
122
123// ─────────────────────────────────────────────────────────────────────────────
124// DataSourcePort
125// ─────────────────────────────────────────────────────────────────────────────
126
127#[async_trait]
128impl DataSourcePort for DatabaseSource {
129    async fn query(&self, params: QueryParams) -> Result<Vec<Value>> {
130        let rows: Vec<sqlx::postgres::PgRow> = sqlx::query(&params.query)
131            .fetch_all(&self.pool)
132            .await
133            .map_err(|e| {
134                StygianError::Service(ServiceError::InvalidResponse(format!("query failed: {e}")))
135            })?;
136
137        let mut results = Vec::with_capacity(rows.len());
138        for row in &rows {
139            // Convert each row to a JSON object using column metadata
140            let columns = row.columns();
141            let mut obj = serde_json::Map::new();
142            for col in columns {
143                let name = col.name().to_string();
144                let value: Value = Self::extract_column_value(row, col);
145                obj.insert(name, value);
146            }
147            results.push(Value::Object(obj));
148
149            if let Some(limit) = params.limit
150                && results.len() as u64 >= limit
151            {
152                break;
153            }
154        }
155
156        Ok(results)
157    }
158
159    async fn healthcheck(&self) -> Result<()> {
160        sqlx::query("SELECT 1")
161            .fetch_one(&self.pool)
162            .await
163            .map_err(|e| {
164                StygianError::Service(ServiceError::Unavailable(format!(
165                    "healthcheck failed: {e}"
166                )))
167            })?;
168        Ok(())
169    }
170
171    fn source_name(&self) -> &str {
172        &self.name
173    }
174}
175
176impl DatabaseSource {
177    /// Extract a column value from a `PgRow` as a `serde_json::Value`.
178    ///
179    /// Handles common Postgres types; falls back to the debug representation
180    /// for unsupported types.
181    fn extract_column_value(row: &sqlx::postgres::PgRow, col: &sqlx::postgres::PgColumn) -> Value {
182        use sqlx::Column;
183        use sqlx::TypeInfo;
184
185        let type_name = col.type_info().name();
186        let idx = col.ordinal();
187
188        match type_name {
189            "INT4" | "INT2" => row.try_get::<i32, _>(idx).map_or(Value::Null, |v| json!(v)),
190            "INT8" => row.try_get::<i64, _>(idx).map_or(Value::Null, |v| json!(v)),
191            "FLOAT4" => row.try_get::<f32, _>(idx).map_or(Value::Null, |v| json!(v)),
192            "FLOAT8" | "NUMERIC" => row.try_get::<f64, _>(idx).map_or(Value::Null, |v| json!(v)),
193            "BOOL" => row
194                .try_get::<bool, _>(idx)
195                .map_or(Value::Null, |v| json!(v)),
196            "TEXT" | "VARCHAR" | "CHAR" | "NAME" => row
197                .try_get::<String, _>(idx)
198                .map_or(Value::Null, |v| json!(v)),
199            "JSON" | "JSONB" => row.try_get::<Value, _>(idx).unwrap_or(Value::Null),
200            _ => row
201                .try_get::<String, _>(idx)
202                .map_or(Value::Null, |v| json!(v)),
203        }
204    }
205}
206
207// ─────────────────────────────────────────────────────────────────────────────
208// ScrapingService (DAG integration)
209// ─────────────────────────────────────────────────────────────────────────────
210
211#[async_trait]
212impl ScrapingService for DatabaseSource {
213    /// Execute a database query from pipeline parameters.
214    ///
215    /// Expected params:
216    /// ```json
217    /// { "query": "SELECT ...", "parameters": [...], "limit": 100 }
218    /// ```
219    ///
220    /// # Example
221    ///
222    /// ```no_run
223    /// # use stygian_graph::ports::{ScrapingService, ServiceInput};
224    /// # use stygian_graph::adapters::database::DatabaseSource;
225    /// # use serde_json::json;
226    /// # async fn example(db: DatabaseSource) {
227    /// let input = ServiceInput {
228    ///     url: String::new(),
229    ///     params: json!({"query": "SELECT 1 AS n", "parameters": [], "limit": 10}),
230    /// };
231    /// let result = db.execute(input).await.unwrap();
232    /// # }
233    /// ```
234    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
235        let query_str = input
236            .params
237            .get("query")
238            .and_then(|v| v.as_str())
239            .ok_or_else(|| {
240                StygianError::Service(ServiceError::InvalidResponse(
241                    "missing 'query' in params".into(),
242                ))
243            })?
244            .to_string();
245
246        let parameters: Vec<Value> = input
247            .params
248            .get("parameters")
249            .and_then(|v| v.as_array())
250            .cloned()
251            .unwrap_or_default();
252
253        let limit = input.params.get("limit").and_then(Value::as_u64);
254
255        let params = QueryParams {
256            query: query_str,
257            parameters,
258            limit,
259        };
260
261        let rows = self.query(params).await?;
262        let row_count = rows.len();
263
264        Ok(ServiceOutput {
265            data: serde_json::to_string(&rows).unwrap_or_default(),
266            metadata: json!({
267                "source": self.name,
268                "row_count": row_count,
269            }),
270        })
271    }
272
273    fn name(&self) -> &'static str {
274        "database"
275    }
276}