stygian_graph/adapters/database.rs
1//! Database source adapter — queries PostgreSQL as a pipeline data source.
2//!
3//! Implements [`DataSourcePort`](crate::ports::data_source::DataSourcePort) and [`ScrapingService`](crate::ports::ScrapingService) so database queries
4//! can participate in a DAG pipeline as a first-class node.
5//!
6//! Requires the `postgres` feature flag (`sqlx` dependency).
7//!
8//! # Example
9//!
10//! ```no_run
11//! use stygian_graph::adapters::database::DatabaseSource;
12//! use stygian_graph::ports::data_source::{DataSourcePort, QueryParams};
13//! use serde_json::json;
14//!
15//! # async fn example() {
16//! let db = DatabaseSource::new("postgres://user:pass@localhost/mydb").await.unwrap();
17//! let rows = db.query(QueryParams {
18//! query: "SELECT id, name FROM users LIMIT 10".into(),
19//! parameters: vec![],
20//! limit: Some(10),
21//! }).await.unwrap();
22//! # }
23//! ```
24
25use async_trait::async_trait;
26use serde_json::{Value, json};
27use sqlx::postgres::PgPoolOptions;
28use sqlx::{Column, PgPool, Row};
29
30use crate::domain::error::{Result, ServiceError, StygianError};
31use crate::ports::data_source::{DataSourcePort, QueryParams};
32use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
33
34// ─────────────────────────────────────────────────────────────────────────────
35// DatabaseSource
36// ─────────────────────────────────────────────────────────────────────────────
37
38/// Adapter: `PostgreSQL` database as a pipeline data source.
39///
40/// Wraps a `sqlx::PgPool` and implements both [`DataSourcePort`] (for direct
41/// querying) and [`ScrapingService`] (for DAG pipeline integration).
42///
43/// # Example
44///
45/// ```no_run
46/// use stygian_graph::adapters::database::DatabaseSource;
47///
48/// # async fn example() {
49/// let db = DatabaseSource::new("postgres://localhost/testdb").await.unwrap();
50/// println!("Connected to: {}", db.source_name());
51/// # }
52/// ```
53pub struct DatabaseSource {
54 pool: PgPool,
55 name: String,
56}
57
58impl DatabaseSource {
59 /// Connect to a `PostgreSQL` database.
60 ///
61 /// # Arguments
62 ///
63 /// * `database_url` - `PostgreSQL` connection string
64 ///
65 /// # Example
66 ///
67 /// ```no_run
68 /// use stygian_graph::adapters::database::DatabaseSource;
69 ///
70 /// # async fn example() {
71 /// let db = DatabaseSource::new("postgres://localhost/mydb").await.unwrap();
72 /// # }
73 /// ```
74 ///
75 /// # Errors
76 ///
77 /// Returns [`StygianError`] wrapping `ServiceError::Unavailable` when the
78 /// supplied connection URL is malformed or the database is unreachable.
79 pub async fn new(database_url: &str) -> Result<Self> {
80 let pool = PgPoolOptions::new()
81 .max_connections(5)
82 .connect(database_url)
83 .await
84 .map_err(|e| {
85 StygianError::Service(ServiceError::Unavailable(format!(
86 "database connection failed: {e}"
87 )))
88 })?;
89
90 Ok(Self {
91 pool,
92 name: "postgres".to_string(),
93 })
94 }
95
96 /// Create from an existing pool (useful for testing or shared connections).
97 ///
98 /// # Example
99 ///
100 /// ```no_run
101 /// use stygian_graph::adapters::database::DatabaseSource;
102 /// use sqlx::PgPool;
103 ///
104 /// # async fn example(pool: PgPool) {
105 /// let db = DatabaseSource::from_pool(pool);
106 /// # }
107 /// ```
108 #[must_use]
109 pub fn from_pool(pool: PgPool) -> Self {
110 Self {
111 pool,
112 name: "postgres".to_string(),
113 }
114 }
115
116 /// Return the name of this source for display/logging.
117 #[must_use]
118 pub fn source_name(&self) -> &str {
119 &self.name
120 }
121}
122
123// ─────────────────────────────────────────────────────────────────────────────
124// DataSourcePort
125// ─────────────────────────────────────────────────────────────────────────────
126
127#[async_trait]
128impl DataSourcePort for DatabaseSource {
129 async fn query(&self, params: QueryParams) -> Result<Vec<Value>> {
130 let rows: Vec<sqlx::postgres::PgRow> = sqlx::query(¶ms.query)
131 .fetch_all(&self.pool)
132 .await
133 .map_err(|e| {
134 StygianError::Service(ServiceError::InvalidResponse(format!("query failed: {e}")))
135 })?;
136
137 let mut results = Vec::with_capacity(rows.len());
138 for row in &rows {
139 // Convert each row to a JSON object using column metadata
140 let columns = row.columns();
141 let mut obj = serde_json::Map::new();
142 for col in columns {
143 let name = col.name().to_string();
144 let value: Value = Self::extract_column_value(row, col);
145 obj.insert(name, value);
146 }
147 results.push(Value::Object(obj));
148
149 if let Some(limit) = params.limit
150 && results.len() as u64 >= limit
151 {
152 break;
153 }
154 }
155
156 Ok(results)
157 }
158
159 async fn healthcheck(&self) -> Result<()> {
160 sqlx::query("SELECT 1")
161 .fetch_one(&self.pool)
162 .await
163 .map_err(|e| {
164 StygianError::Service(ServiceError::Unavailable(format!(
165 "healthcheck failed: {e}"
166 )))
167 })?;
168 Ok(())
169 }
170
171 fn source_name(&self) -> &str {
172 &self.name
173 }
174}
175
176impl DatabaseSource {
177 /// Extract a column value from a `PgRow` as a `serde_json::Value`.
178 ///
179 /// Handles common Postgres types; falls back to the debug representation
180 /// for unsupported types.
181 fn extract_column_value(row: &sqlx::postgres::PgRow, col: &sqlx::postgres::PgColumn) -> Value {
182 use sqlx::Column;
183 use sqlx::TypeInfo;
184
185 let type_name = col.type_info().name();
186 let idx = col.ordinal();
187
188 match type_name {
189 "INT4" | "INT2" => row.try_get::<i32, _>(idx).map_or(Value::Null, |v| json!(v)),
190 "INT8" => row.try_get::<i64, _>(idx).map_or(Value::Null, |v| json!(v)),
191 "FLOAT4" => row.try_get::<f32, _>(idx).map_or(Value::Null, |v| json!(v)),
192 "FLOAT8" | "NUMERIC" => row.try_get::<f64, _>(idx).map_or(Value::Null, |v| json!(v)),
193 "BOOL" => row
194 .try_get::<bool, _>(idx)
195 .map_or(Value::Null, |v| json!(v)),
196 "TEXT" | "VARCHAR" | "CHAR" | "NAME" => row
197 .try_get::<String, _>(idx)
198 .map_or(Value::Null, |v| json!(v)),
199 "JSON" | "JSONB" => row.try_get::<Value, _>(idx).unwrap_or(Value::Null),
200 _ => row
201 .try_get::<String, _>(idx)
202 .map_or(Value::Null, |v| json!(v)),
203 }
204 }
205}
206
207// ─────────────────────────────────────────────────────────────────────────────
208// ScrapingService (DAG integration)
209// ─────────────────────────────────────────────────────────────────────────────
210
211#[async_trait]
212impl ScrapingService for DatabaseSource {
213 /// Execute a database query from pipeline parameters.
214 ///
215 /// Expected params:
216 /// ```json
217 /// { "query": "SELECT ...", "parameters": [...], "limit": 100 }
218 /// ```
219 ///
220 /// # Example
221 ///
222 /// ```no_run
223 /// # use stygian_graph::ports::{ScrapingService, ServiceInput};
224 /// # use stygian_graph::adapters::database::DatabaseSource;
225 /// # use serde_json::json;
226 /// # async fn example(db: DatabaseSource) {
227 /// let input = ServiceInput {
228 /// url: String::new(),
229 /// params: json!({"query": "SELECT 1 AS n", "parameters": [], "limit": 10}),
230 /// };
231 /// let result = db.execute(input).await.unwrap();
232 /// # }
233 /// ```
234 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
235 let query_str = input
236 .params
237 .get("query")
238 .and_then(|v| v.as_str())
239 .ok_or_else(|| {
240 StygianError::Service(ServiceError::InvalidResponse(
241 "missing 'query' in params".into(),
242 ))
243 })?
244 .to_string();
245
246 let parameters: Vec<Value> = input
247 .params
248 .get("parameters")
249 .and_then(|v| v.as_array())
250 .cloned()
251 .unwrap_or_default();
252
253 let limit = input.params.get("limit").and_then(Value::as_u64);
254
255 let params = QueryParams {
256 query: query_str,
257 parameters,
258 limit,
259 };
260
261 let rows = self.query(params).await?;
262 let row_count = rows.len();
263
264 Ok(ServiceOutput {
265 data: serde_json::to_string(&rows).unwrap_or_default(),
266 metadata: json!({
267 "source": self.name,
268 "row_count": row_count,
269 }),
270 })
271 }
272
273 fn name(&self) -> &'static str {
274 "database"
275 }
276}