stygian_graph/adapters/database.rs
1//! Database source adapter — queries PostgreSQL as a pipeline data source.
2//!
3//! Implements [`DataSourcePort`](crate::ports::data_source::DataSourcePort) and [`ScrapingService`](crate::ports::ScrapingService) so database queries
4//! can participate in a DAG pipeline as a first-class node.
5//!
6//! Requires the `postgres` feature flag (`sqlx` dependency).
7//!
8//! # Example
9//!
10//! ```no_run
11//! use stygian_graph::adapters::database::DatabaseSource;
12//! use stygian_graph::ports::data_source::{DataSourcePort, QueryParams};
13//! use serde_json::json;
14//!
15//! # async fn example() {
16//! let db = DatabaseSource::new("postgres://user:pass@localhost/mydb").await.unwrap();
17//! let rows = db.query(QueryParams {
18//! query: "SELECT id, name FROM users LIMIT 10".into(),
19//! parameters: vec![],
20//! limit: Some(10),
21//! }).await.unwrap();
22//! # }
23//! ```
24
25use async_trait::async_trait;
26use serde_json::{Value, json};
27use sqlx::postgres::PgPoolOptions;
28use sqlx::{Column, PgPool, Row};
29
30use crate::domain::error::{Result, ServiceError, StygianError};
31use crate::ports::data_source::{DataSourcePort, QueryParams};
32use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
33
34// ─────────────────────────────────────────────────────────────────────────────
35// DatabaseSource
36// ─────────────────────────────────────────────────────────────────────────────
37
38/// Adapter: `PostgreSQL` database as a pipeline data source.
39///
40/// Wraps a `sqlx::PgPool` and implements both [`DataSourcePort`] (for direct
41/// querying) and [`ScrapingService`] (for DAG pipeline integration).
42///
43/// # Example
44///
45/// ```no_run
46/// use stygian_graph::adapters::database::DatabaseSource;
47///
48/// # async fn example() {
49/// let db = DatabaseSource::new("postgres://localhost/testdb").await.unwrap();
50/// println!("Connected to: {}", db.source_name());
51/// # }
52/// ```
53pub struct DatabaseSource {
54 pool: PgPool,
55 name: String,
56}
57
58impl DatabaseSource {
59 /// Connect to a `PostgreSQL` database.
60 ///
61 /// # Arguments
62 ///
63 /// * `database_url` - `PostgreSQL` connection string
64 ///
65 /// # Example
66 ///
67 /// ```no_run
68 /// use stygian_graph::adapters::database::DatabaseSource;
69 ///
70 /// # async fn example() {
71 /// let db = DatabaseSource::new("postgres://localhost/mydb").await.unwrap();
72 /// # }
73 /// ```
74 pub async fn new(database_url: &str) -> Result<Self> {
75 let pool = PgPoolOptions::new()
76 .max_connections(5)
77 .connect(database_url)
78 .await
79 .map_err(|e| {
80 StygianError::Service(ServiceError::Unavailable(format!(
81 "database connection failed: {e}"
82 )))
83 })?;
84
85 Ok(Self {
86 pool,
87 name: "postgres".to_string(),
88 })
89 }
90
91 /// Create from an existing pool (useful for testing or shared connections).
92 ///
93 /// # Example
94 ///
95 /// ```no_run
96 /// use stygian_graph::adapters::database::DatabaseSource;
97 /// use sqlx::PgPool;
98 ///
99 /// # async fn example(pool: PgPool) {
100 /// let db = DatabaseSource::from_pool(pool);
101 /// # }
102 /// ```
103 #[must_use]
104 pub fn from_pool(pool: PgPool) -> Self {
105 Self {
106 pool,
107 name: "postgres".to_string(),
108 }
109 }
110
111 /// Return the name of this source for display/logging.
112 #[must_use]
113 pub fn source_name(&self) -> &str {
114 &self.name
115 }
116}
117
118// ─────────────────────────────────────────────────────────────────────────────
119// DataSourcePort
120// ─────────────────────────────────────────────────────────────────────────────
121
122#[async_trait]
123impl DataSourcePort for DatabaseSource {
124 async fn query(&self, params: QueryParams) -> Result<Vec<Value>> {
125 let rows: Vec<sqlx::postgres::PgRow> = sqlx::query(¶ms.query)
126 .fetch_all(&self.pool)
127 .await
128 .map_err(|e| {
129 StygianError::Service(ServiceError::InvalidResponse(format!("query failed: {e}")))
130 })?;
131
132 let mut results = Vec::with_capacity(rows.len());
133 for row in &rows {
134 // Convert each row to a JSON object using column metadata
135 let columns = row.columns();
136 let mut obj = serde_json::Map::new();
137 for col in columns {
138 let name = col.name().to_string();
139 let value: Value = Self::extract_column_value(row, col);
140 obj.insert(name, value);
141 }
142 results.push(Value::Object(obj));
143
144 if let Some(limit) = params.limit
145 && results.len() as u64 >= limit
146 {
147 break;
148 }
149 }
150
151 Ok(results)
152 }
153
154 async fn healthcheck(&self) -> Result<()> {
155 sqlx::query("SELECT 1")
156 .fetch_one(&self.pool)
157 .await
158 .map_err(|e| {
159 StygianError::Service(ServiceError::Unavailable(format!(
160 "healthcheck failed: {e}"
161 )))
162 })?;
163 Ok(())
164 }
165
166 fn source_name(&self) -> &str {
167 &self.name
168 }
169}
170
171impl DatabaseSource {
172 /// Extract a column value from a `PgRow` as a `serde_json::Value`.
173 ///
174 /// Handles common Postgres types; falls back to the debug representation
175 /// for unsupported types.
176 fn extract_column_value(row: &sqlx::postgres::PgRow, col: &sqlx::postgres::PgColumn) -> Value {
177 use sqlx::Column;
178 use sqlx::TypeInfo;
179
180 let type_name = col.type_info().name();
181 let idx = col.ordinal();
182
183 match type_name {
184 "INT4" | "INT2" => row.try_get::<i32, _>(idx).map_or(Value::Null, |v| json!(v)),
185 "INT8" => row.try_get::<i64, _>(idx).map_or(Value::Null, |v| json!(v)),
186 "FLOAT4" => row.try_get::<f32, _>(idx).map_or(Value::Null, |v| json!(v)),
187 "FLOAT8" | "NUMERIC" => row.try_get::<f64, _>(idx).map_or(Value::Null, |v| json!(v)),
188 "BOOL" => row
189 .try_get::<bool, _>(idx)
190 .map_or(Value::Null, |v| json!(v)),
191 "TEXT" | "VARCHAR" | "CHAR" | "NAME" => row
192 .try_get::<String, _>(idx)
193 .map_or(Value::Null, |v| json!(v)),
194 "JSON" | "JSONB" => row.try_get::<Value, _>(idx).unwrap_or(Value::Null),
195 _ => row
196 .try_get::<String, _>(idx)
197 .map_or(Value::Null, |v| json!(v)),
198 }
199 }
200}
201
202// ─────────────────────────────────────────────────────────────────────────────
203// ScrapingService (DAG integration)
204// ─────────────────────────────────────────────────────────────────────────────
205
206#[async_trait]
207impl ScrapingService for DatabaseSource {
208 /// Execute a database query from pipeline parameters.
209 ///
210 /// Expected params:
211 /// ```json
212 /// { "query": "SELECT ...", "parameters": [...], "limit": 100 }
213 /// ```
214 ///
215 /// # Example
216 ///
217 /// ```no_run
218 /// # use stygian_graph::ports::{ScrapingService, ServiceInput};
219 /// # use stygian_graph::adapters::database::DatabaseSource;
220 /// # use serde_json::json;
221 /// # async fn example(db: DatabaseSource) {
222 /// let input = ServiceInput {
223 /// url: String::new(),
224 /// params: json!({"query": "SELECT 1 AS n", "parameters": [], "limit": 10}),
225 /// };
226 /// let result = db.execute(input).await.unwrap();
227 /// # }
228 /// ```
229 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
230 let query_str = input
231 .params
232 .get("query")
233 .and_then(|v| v.as_str())
234 .ok_or_else(|| {
235 StygianError::Service(ServiceError::InvalidResponse(
236 "missing 'query' in params".into(),
237 ))
238 })?
239 .to_string();
240
241 let parameters: Vec<Value> = input
242 .params
243 .get("parameters")
244 .and_then(|v| v.as_array())
245 .cloned()
246 .unwrap_or_default();
247
248 let limit = input.params.get("limit").and_then(Value::as_u64);
249
250 let params = QueryParams {
251 query: query_str,
252 parameters,
253 limit,
254 };
255
256 let rows = self.query(params).await?;
257 let row_count = rows.len();
258
259 Ok(ServiceOutput {
260 data: serde_json::to_string(&rows).unwrap_or_default(),
261 metadata: json!({
262 "source": self.name,
263 "row_count": row_count,
264 }),
265 })
266 }
267
268 fn name(&self) -> &'static str {
269 "database"
270 }
271}