stygian_graph/adapters/
output_format.rs1use crate::domain::error::{Result, ServiceError, StygianError};
7use crate::ports::storage::{OutputFormat, OutputFormatter, StorageRecord};
8
9pub struct JsonlFormatter;
30
31impl OutputFormatter for JsonlFormatter {
32 fn format(&self, records: &[StorageRecord]) -> Result<Vec<u8>> {
33 let mut out = Vec::new();
34 for record in records {
35 let line = serde_json::to_string(record).map_err(|e| {
36 StygianError::Service(ServiceError::InvalidResponse(format!(
37 "JSONL serialisation error: {e}"
38 )))
39 })?;
40 out.extend_from_slice(line.as_bytes());
41 out.push(b'\n');
42 }
43 Ok(out)
44 }
45
46 fn format_type(&self) -> OutputFormat {
47 OutputFormat::Jsonl
48 }
49}
50
51pub struct JsonFormatter;
72
73impl OutputFormatter for JsonFormatter {
74 fn format(&self, records: &[StorageRecord]) -> Result<Vec<u8>> {
75 let mut out = serde_json::to_vec_pretty(records).map_err(|e| {
76 StygianError::Service(ServiceError::InvalidResponse(format!(
77 "JSON serialisation error: {e}"
78 )))
79 })?;
80 out.push(b'\n');
81 Ok(out)
82 }
83
84 fn format_type(&self) -> OutputFormat {
85 OutputFormat::Json
86 }
87}
88
89pub struct CsvFormatter;
112
113impl OutputFormatter for CsvFormatter {
114 fn format(&self, records: &[StorageRecord]) -> Result<Vec<u8>> {
115 let mut wtr = csv::WriterBuilder::new()
116 .has_headers(true)
117 .from_writer(Vec::new());
118
119 wtr.write_record(["id", "pipeline_id", "node_name", "timestamp_ms", "data"])
121 .map_err(|e| {
122 StygianError::Service(ServiceError::InvalidResponse(format!(
123 "CSV header error: {e}"
124 )))
125 })?;
126
127 for record in records {
128 let data_str = serde_json::to_string(&record.data).map_err(|e| {
129 StygianError::Service(ServiceError::InvalidResponse(format!(
130 "CSV data serialisation error: {e}"
131 )))
132 })?;
133 wtr.write_record([
134 &record.id,
135 &record.pipeline_id,
136 &record.node_name,
137 &record.timestamp_ms.to_string(),
138 &data_str,
139 ])
140 .map_err(|e| {
141 StygianError::Service(ServiceError::InvalidResponse(format!(
142 "CSV write error: {e}"
143 )))
144 })?;
145 }
146
147 let bytes = wtr.into_inner().map_err(|e| {
148 StygianError::Service(ServiceError::InvalidResponse(format!(
149 "CSV finalisation error: {e}"
150 )))
151 })?;
152
153 Ok(bytes)
154 }
155
156 fn format_type(&self) -> OutputFormat {
157 OutputFormat::Csv
158 }
159}
160
161#[must_use]
177pub fn formatter_for(format: OutputFormat) -> Box<dyn OutputFormatter> {
178 match format {
179 OutputFormat::Jsonl => Box::new(JsonlFormatter),
180 OutputFormat::Json => Box::new(JsonFormatter),
181 OutputFormat::Csv => Box::new(CsvFormatter),
182 }
183}
184
185#[cfg(test)]
190#[allow(clippy::unwrap_used, clippy::expect_used, clippy::indexing_slicing)]
191mod tests {
192 use super::*;
193 use serde_json::json;
194
195 #[test]
196 fn jsonl_produces_one_line_per_record() {
197 let records = vec![
198 StorageRecord::new("p", "n", json!({"a": 1})),
199 StorageRecord::new("p", "n", json!({"b": 2})),
200 ];
201 let bytes = JsonlFormatter.format(&records).unwrap();
202 let text = String::from_utf8(bytes).unwrap();
203 let lines: Vec<&str> = text.trim_end_matches('\n').split('\n').collect();
204 assert_eq!(lines.len(), 2);
205 for line in lines {
206 let _: StorageRecord = serde_json::from_str(line).expect("valid JSONL");
207 }
208 }
209
210 #[test]
211 fn json_produces_array() {
212 let records = vec![StorageRecord::new("p", "n", json!({"x": 42}))];
213 let bytes = JsonFormatter.format(&records).unwrap();
214 let text = String::from_utf8(bytes).unwrap();
215 assert!(text.starts_with('['), "should start with [");
216 let _: Vec<StorageRecord> = serde_json::from_str(text.trim()).expect("valid JSON array");
217 }
218
219 #[test]
220 fn csv_has_header_and_row() {
221 let records = vec![StorageRecord::new("pipe-1", "node-a", json!({"k": "v"}))];
222 let bytes = CsvFormatter.format(&records).unwrap();
223 let text = String::from_utf8(bytes).unwrap();
224 let mut lines = text.lines();
225 let header = lines.next().unwrap();
226 assert_eq!(header, "id,pipeline_id,node_name,timestamp_ms,data");
227 let data_line = lines.next().unwrap();
228 assert!(data_line.contains("pipe-1"));
229 assert!(data_line.contains("node-a"));
230 }
231
232 #[test]
233 fn csv_empty_records_only_header() {
234 let bytes = CsvFormatter.format(&[]).unwrap();
235 let text = String::from_utf8(bytes).unwrap();
236 let lines: Vec<&str> = text.lines().collect();
237 assert_eq!(lines.len(), 1);
238 assert_eq!(lines[0], "id,pipeline_id,node_name,timestamp_ms,data");
239 }
240
241 #[test]
242 fn formatter_for_selects_correct_type() {
243 assert_eq!(
244 formatter_for(OutputFormat::Jsonl).format_type(),
245 OutputFormat::Jsonl
246 );
247 assert_eq!(
248 formatter_for(OutputFormat::Json).format_type(),
249 OutputFormat::Json
250 );
251 assert_eq!(
252 formatter_for(OutputFormat::Csv).format_type(),
253 OutputFormat::Csv
254 );
255 }
256}