stygian_graph/adapters/
output_format.rs1use crate::domain::error::{Result, ServiceError, StygianError};
7use crate::ports::storage::{OutputFormat, OutputFormatter, StorageRecord};
8
9pub struct JsonlFormatter;
30
31impl OutputFormatter for JsonlFormatter {
32 fn format(&self, records: &[StorageRecord]) -> Result<Vec<u8>> {
33 let mut out = Vec::new();
34 for record in records {
35 let line = serde_json::to_string(record).map_err(|e| {
36 StygianError::Service(ServiceError::InvalidResponse(format!(
37 "JSONL serialisation error: {e}"
38 )))
39 })?;
40 out.extend_from_slice(line.as_bytes());
41 out.push(b'\n');
42 }
43 Ok(out)
44 }
45
46 fn format_type(&self) -> OutputFormat {
47 OutputFormat::Jsonl
48 }
49}
50
51pub struct JsonFormatter;
72
73impl OutputFormatter for JsonFormatter {
74 fn format(&self, records: &[StorageRecord]) -> Result<Vec<u8>> {
75 let mut out = serde_json::to_vec_pretty(records).map_err(|e| {
76 StygianError::Service(ServiceError::InvalidResponse(format!(
77 "JSON serialisation error: {e}"
78 )))
79 })?;
80 out.push(b'\n');
81 Ok(out)
82 }
83
84 fn format_type(&self) -> OutputFormat {
85 OutputFormat::Json
86 }
87}
88
89pub struct CsvFormatter;
112
113impl OutputFormatter for CsvFormatter {
114 fn format(&self, records: &[StorageRecord]) -> Result<Vec<u8>> {
115 let mut wtr = csv::WriterBuilder::new()
116 .has_headers(true)
117 .from_writer(Vec::new());
118
119 wtr.write_record(["id", "pipeline_id", "node_name", "timestamp_ms", "data"])
121 .map_err(|e| {
122 StygianError::Service(ServiceError::InvalidResponse(format!(
123 "CSV header error: {e}"
124 )))
125 })?;
126
127 for record in records {
128 let data_str = serde_json::to_string(&record.data).map_err(|e| {
129 StygianError::Service(ServiceError::InvalidResponse(format!(
130 "CSV data serialisation error: {e}"
131 )))
132 })?;
133 wtr.write_record([
134 &record.id,
135 &record.pipeline_id,
136 &record.node_name,
137 &record.timestamp_ms.to_string(),
138 &data_str,
139 ])
140 .map_err(|e| {
141 StygianError::Service(ServiceError::InvalidResponse(format!(
142 "CSV write error: {e}"
143 )))
144 })?;
145 }
146
147 let bytes = wtr.into_inner().map_err(|e| {
148 StygianError::Service(ServiceError::InvalidResponse(format!(
149 "CSV finalisation error: {e}"
150 )))
151 })?;
152
153 Ok(bytes)
154 }
155
156 fn format_type(&self) -> OutputFormat {
157 OutputFormat::Csv
158 }
159}
160
161pub fn formatter_for(format: OutputFormat) -> Box<dyn OutputFormatter> {
177 match format {
178 OutputFormat::Jsonl => Box::new(JsonlFormatter),
179 OutputFormat::Json => Box::new(JsonFormatter),
180 OutputFormat::Csv => Box::new(CsvFormatter),
181 }
182}
183
184#[cfg(test)]
189#[allow(clippy::unwrap_used, clippy::expect_used, clippy::indexing_slicing)]
190mod tests {
191 use super::*;
192 use serde_json::json;
193
194 #[test]
195 fn jsonl_produces_one_line_per_record() {
196 let records = vec![
197 StorageRecord::new("p", "n", json!({"a": 1})),
198 StorageRecord::new("p", "n", json!({"b": 2})),
199 ];
200 let bytes = JsonlFormatter.format(&records).unwrap();
201 let text = String::from_utf8(bytes).unwrap();
202 let lines: Vec<&str> = text.trim_end_matches('\n').split('\n').collect();
203 assert_eq!(lines.len(), 2);
204 for line in lines {
205 let _: StorageRecord = serde_json::from_str(line).expect("valid JSONL");
206 }
207 }
208
209 #[test]
210 fn json_produces_array() {
211 let records = vec![StorageRecord::new("p", "n", json!({"x": 42}))];
212 let bytes = JsonFormatter.format(&records).unwrap();
213 let text = String::from_utf8(bytes).unwrap();
214 assert!(text.starts_with('['), "should start with [");
215 let _: Vec<StorageRecord> = serde_json::from_str(text.trim()).expect("valid JSON array");
216 }
217
218 #[test]
219 fn csv_has_header_and_row() {
220 let records = vec![StorageRecord::new("pipe-1", "node-a", json!({"k": "v"}))];
221 let bytes = CsvFormatter.format(&records).unwrap();
222 let text = String::from_utf8(bytes).unwrap();
223 let mut lines = text.lines();
224 let header = lines.next().unwrap();
225 assert_eq!(header, "id,pipeline_id,node_name,timestamp_ms,data");
226 let data_line = lines.next().unwrap();
227 assert!(data_line.contains("pipe-1"));
228 assert!(data_line.contains("node-a"));
229 }
230
231 #[test]
232 fn csv_empty_records_only_header() {
233 let bytes = CsvFormatter.format(&[]).unwrap();
234 let text = String::from_utf8(bytes).unwrap();
235 let lines: Vec<&str> = text.lines().collect();
236 assert_eq!(lines.len(), 1);
237 assert_eq!(lines[0], "id,pipeline_id,node_name,timestamp_ms,data");
238 }
239
240 #[test]
241 fn formatter_for_selects_correct_type() {
242 assert_eq!(
243 formatter_for(OutputFormat::Jsonl).format_type(),
244 OutputFormat::Jsonl
245 );
246 assert_eq!(
247 formatter_for(OutputFormat::Json).format_type(),
248 OutputFormat::Json
249 );
250 assert_eq!(
251 formatter_for(OutputFormat::Csv).format_type(),
252 OutputFormat::Csv
253 );
254 }
255}