Streaming Parser
Overview
The streaming parser provides memory-efficient CSV parsing for large files that don’t fit in memory. Unlike the batch parser that builds a complete index of field positions, the streaming parser processes data row-by-row, only keeping one row in memory at a time.
Two parsing models are supported:
- Pull Model: Request rows one at a time via
next_row()or iterate with range-for - Push Model: Feed data chunks to the parser, callbacks are invoked for each row
When to Use Streaming
| Scenario | Use Batch Parser | Use Streaming Parser |
|---|---|---|
| File fits in memory | ✓ | |
| File larger than RAM | ✓ | |
| Need random access to rows | ✓ | |
| Memory-constrained environment | ✓ | |
| Processing data as it arrives | ✓ | |
| Need column extraction/aggregation | ✓ |
Quick Start
Range-based Iteration (Simplest)
#include <streaming.h>
#include <iostream>
int main() {
libvroom::StreamReader reader("large_file.csv");
for (const auto& row : reader) {
// Access fields by index
std::cout << row[0].str() << ", " << row[1].str() << "\n";
}
return 0;
}Explicit Iteration
#include <streaming.h>
#include <iostream>
int main() {
libvroom::StreamReader reader("large_file.csv");
while (reader.next_row()) {
const auto& row = reader.row();
// Access header names
if (reader.rows_read() == 0) {
for (const auto& name : reader.header()) {
std::cout << name << " ";
}
std::cout << "\n";
}
// Access fields by column name
int col_idx = reader.column_index("value");
if (col_idx >= 0) {
std::cout << row[col_idx].str() << "\n";
}
}
std::cout << "Processed " << reader.rows_read() << " rows\n";
return 0;
}StreamReader API
Construction
#include <streaming.h>
// From file path
libvroom::StreamReader reader("data.csv");
// From file path with configuration
libvroom::StreamConfig config;
config.dialect = libvroom::Dialect::tsv();
config.error_mode = libvroom::ErrorMode::FAIL_FAST;
libvroom::StreamReader reader("data.tsv", config);
// From input stream
std::ifstream file("data.csv", std::ios::binary);
libvroom::StreamReader reader(file);Configuration Options
struct StreamConfig {
Dialect dialect = Dialect::csv(); // CSV dialect settings
ErrorMode error_mode = ErrorMode::PERMISSIVE; // Error handling mode
size_t chunk_size = 64 * 1024; // Read chunk size (64KB)
size_t max_field_size = 16 * 1024 * 1024; // Max field size (16MB)
size_t initial_field_capacity = 64; // Initial fields per row
bool parse_header = true; // Parse first row as header
bool skip_empty_rows = false; // Skip rows with no fields
};Reading Rows
// next_row() returns true if a row was read
while (reader.next_row()) {
const auto& row = reader.row();
// Process row...
}
// Check end of file
if (reader.eof()) {
std::cout << "Reached end of file\n";
}
// Get statistics
std::cout << "Rows: " << reader.rows_read() << "\n";
std::cout << "Bytes: " << reader.bytes_read() << "\n";Accessing Header
// Get all header names
const auto& headers = reader.header();
for (const auto& name : headers) {
std::cout << name << "\n";
}
// Get column index by name
int idx = reader.column_index("email"); // -1 if not foundError Handling
// Check for errors after processing
const auto& errors = reader.error_collector();
if (errors.has_errors()) {
std::cerr << "Errors: " << errors.error_count() << "\n";
std::cerr << errors.summary() << "\n";
}Row and Field API
Row Access
const auto& row = reader.row();
// Field count
size_t n = row.field_count();
// Access by index (no bounds checking)
const auto& field = row[0];
// Access by index (with bounds checking)
const auto& field = row.at(0);
// Access by column name (requires header parsing)
const auto& field = row["column_name"];
// Row metadata
size_t row_num = row.row_number(); // 1-based
size_t offset = row.byte_offset(); // Byte position in file
// Iteration
for (const auto& field : row) {
std::cout << field.str() << " ";
}Field Access
const auto& field = row[0];
// Get field data
std::string_view view = field.data; // Zero-copy view
std::string str = field.str(); // Allocates string
// Get unescaped content (handles "" escapes)
std::string clean = field.unescaped();
// Field metadata
bool quoted = field.is_quoted;
size_t index = field.field_index;
bool empty = field.empty();Push Model (StreamParser)
For more control over data feeding, use StreamParser directly:
#include <streaming.h>
#include <fstream>
int main() {
libvroom::StreamParser parser;
// Set row callback
parser.set_row_handler([](const libvroom::Row& row) {
std::cout << row[0].str() << "\n";
return true; // Continue parsing
});
// Set error callback (optional)
parser.set_error_handler([](const libvroom::ParseError& error) {
std::cerr << "Error at line " << error.line << ": " << error.message << "\n";
return true; // Continue parsing
});
// Feed data chunks
std::ifstream file("large.csv", std::ios::binary);
char buffer[65536];
while (file.read(buffer, sizeof(buffer)) || file.gcount() > 0) {
parser.parse_chunk(buffer, file.gcount());
}
// Signal end of input
parser.finish();
std::cout << "Processed " << parser.rows_processed() << " rows\n";
return 0;
}Pull Model with StreamParser
#include <streaming.h>
int main() {
libvroom::StreamParser parser;
// Feed some data
std::string data = "a,b,c\n1,2,3\n4,5,6\n";
parser.parse_chunk(data);
parser.finish();
// Pull rows one at a time
while (parser.next_row() == libvroom::StreamStatus::ROW_READY) {
const auto& row = parser.current_row();
std::cout << row[0].str() << "\n";
}
return 0;
}StreamStatus Values
The streaming API returns status codes:
| Status | Meaning |
|---|---|
OK |
Operation succeeded |
ROW_READY |
A complete row is available |
END_OF_DATA |
No more data to process |
NEED_MORE_DATA |
Parser needs more input |
ERROR |
Parse error occurred |
Memory Efficiency
The streaming parser is designed for minimal memory usage:
- Only one row is kept in memory at a time
- Field data uses zero-copy string views where possible
- Internal buffer grows only as needed
- Configurable chunk size for I/O (default: 64KB)
- Configurable maximum field size to prevent memory exhaustion
Memory Usage Comparison
For a 10GB file with 1000 columns:
| Parser | Memory Usage |
|---|---|
| Batch Parser | ~10GB+ (file + index) |
| Streaming Parser | ~64KB (chunk size) + row buffer |
Performance Considerations
Chunk size: Larger chunks reduce I/O overhead but use more memory. Default 64KB is a good balance.
Single-threaded: The streaming parser is single-threaded. For maximum throughput on large files that fit in memory, use the batch parser with multi-threading.
No random access: You can only iterate forward through the file. To process specific rows, you must read through preceding rows.
Header parsing: If you know the file has no header, set
parse_header = falseto treat the first row as data.
Complete Example
#include <streaming.h>
#include <iostream>
#include <unordered_map>
int main(int argc, char* argv[]) {
if (argc < 2) {
std::cerr << "Usage: " << argv[0] << " <file.csv>\n";
return 1;
}
// Configure for tab-separated, strict error handling
libvroom::StreamConfig config;
config.dialect = libvroom::Dialect::tsv();
config.error_mode = libvroom::ErrorMode::FAIL_FAST;
try {
libvroom::StreamReader reader(argv[1], config);
// Aggregate example: count values in first column
std::unordered_map<std::string, size_t> counts;
for (const auto& row : reader) {
counts[row[0].str()]++;
}
// Print results
for (const auto& [value, count] : counts) {
std::cout << value << ": " << count << "\n";
}
std::cout << "\nTotal rows: " << reader.rows_read() << "\n";
std::cout << "Total bytes: " << reader.bytes_read() << "\n";
// Check for errors
if (reader.error_collector().has_errors()) {
std::cerr << reader.error_collector().summary() << "\n";
return 1;
}
} catch (const std::runtime_error& e) {
std::cerr << "Error: " << e.what() << "\n";
return 1;
}
return 0;
}See Also
- Getting Started - Basic usage and batch parsing
- Error Handling - Error modes and recovery
- Architecture - How the parser works internally