Streaming Parser

Overview

The streaming parser provides memory-efficient CSV parsing for large files that don’t fit in memory. Unlike the batch parser that builds a complete index of field positions, the streaming parser processes data row-by-row, only keeping one row in memory at a time.

Two parsing models are supported:

  • Pull Model: Request rows one at a time via next_row() or iterate with range-for
  • Push Model: Feed data chunks to the parser, callbacks are invoked for each row

When to Use Streaming

Scenario Use Batch Parser Use Streaming Parser
File fits in memory
File larger than RAM
Need random access to rows
Memory-constrained environment
Processing data as it arrives
Need column extraction/aggregation

Quick Start

Range-based Iteration (Simplest)

#include <streaming.h>
#include <iostream>

int main() {
    libvroom::StreamReader reader("large_file.csv");

    for (const auto& row : reader) {
        // Access fields by index
        std::cout << row[0].str() << ", " << row[1].str() << "\n";
    }

    return 0;
}

Explicit Iteration

#include <streaming.h>
#include <iostream>

int main() {
    libvroom::StreamReader reader("large_file.csv");

    while (reader.next_row()) {
        const auto& row = reader.row();

        // Access header names
        if (reader.rows_read() == 0) {
            for (const auto& name : reader.header()) {
                std::cout << name << " ";
            }
            std::cout << "\n";
        }

        // Access fields by column name
        int col_idx = reader.column_index("value");
        if (col_idx >= 0) {
            std::cout << row[col_idx].str() << "\n";
        }
    }

    std::cout << "Processed " << reader.rows_read() << " rows\n";
    return 0;
}

StreamReader API

Construction

#include <streaming.h>

// From file path
libvroom::StreamReader reader("data.csv");

// From file path with configuration
libvroom::StreamConfig config;
config.dialect = libvroom::Dialect::tsv();
config.error_mode = libvroom::ErrorMode::FAIL_FAST;
libvroom::StreamReader reader("data.tsv", config);

// From input stream
std::ifstream file("data.csv", std::ios::binary);
libvroom::StreamReader reader(file);

Configuration Options

struct StreamConfig {
    Dialect dialect = Dialect::csv();         // CSV dialect settings
    ErrorMode error_mode = ErrorMode::PERMISSIVE;  // Error handling mode

    size_t chunk_size = 64 * 1024;            // Read chunk size (64KB)
    size_t max_field_size = 16 * 1024 * 1024; // Max field size (16MB)
    size_t initial_field_capacity = 64;       // Initial fields per row

    bool parse_header = true;     // Parse first row as header
    bool skip_empty_rows = false; // Skip rows with no fields
};

Reading Rows

// next_row() returns true if a row was read
while (reader.next_row()) {
    const auto& row = reader.row();
    // Process row...
}

// Check end of file
if (reader.eof()) {
    std::cout << "Reached end of file\n";
}

// Get statistics
std::cout << "Rows: " << reader.rows_read() << "\n";
std::cout << "Bytes: " << reader.bytes_read() << "\n";

Accessing Header

// Get all header names
const auto& headers = reader.header();
for (const auto& name : headers) {
    std::cout << name << "\n";
}

// Get column index by name
int idx = reader.column_index("email");  // -1 if not found

Error Handling

// Check for errors after processing
const auto& errors = reader.error_collector();

if (errors.has_errors()) {
    std::cerr << "Errors: " << errors.error_count() << "\n";
    std::cerr << errors.summary() << "\n";
}

Row and Field API

Row Access

const auto& row = reader.row();

// Field count
size_t n = row.field_count();

// Access by index (no bounds checking)
const auto& field = row[0];

// Access by index (with bounds checking)
const auto& field = row.at(0);

// Access by column name (requires header parsing)
const auto& field = row["column_name"];

// Row metadata
size_t row_num = row.row_number();    // 1-based
size_t offset = row.byte_offset();    // Byte position in file

// Iteration
for (const auto& field : row) {
    std::cout << field.str() << " ";
}

Field Access

const auto& field = row[0];

// Get field data
std::string_view view = field.data;   // Zero-copy view
std::string str = field.str();         // Allocates string

// Get unescaped content (handles "" escapes)
std::string clean = field.unescaped();

// Field metadata
bool quoted = field.is_quoted;
size_t index = field.field_index;
bool empty = field.empty();

Push Model (StreamParser)

For more control over data feeding, use StreamParser directly:

#include <streaming.h>
#include <fstream>

int main() {
    libvroom::StreamParser parser;

    // Set row callback
    parser.set_row_handler([](const libvroom::Row& row) {
        std::cout << row[0].str() << "\n";
        return true;  // Continue parsing
    });

    // Set error callback (optional)
    parser.set_error_handler([](const libvroom::ParseError& error) {
        std::cerr << "Error at line " << error.line << ": " << error.message << "\n";
        return true;  // Continue parsing
    });

    // Feed data chunks
    std::ifstream file("large.csv", std::ios::binary);
    char buffer[65536];

    while (file.read(buffer, sizeof(buffer)) || file.gcount() > 0) {
        parser.parse_chunk(buffer, file.gcount());
    }

    // Signal end of input
    parser.finish();

    std::cout << "Processed " << parser.rows_processed() << " rows\n";
    return 0;
}

Pull Model with StreamParser

#include <streaming.h>

int main() {
    libvroom::StreamParser parser;

    // Feed some data
    std::string data = "a,b,c\n1,2,3\n4,5,6\n";
    parser.parse_chunk(data);
    parser.finish();

    // Pull rows one at a time
    while (parser.next_row() == libvroom::StreamStatus::ROW_READY) {
        const auto& row = parser.current_row();
        std::cout << row[0].str() << "\n";
    }

    return 0;
}

StreamStatus Values

The streaming API returns status codes:

Status Meaning
OK Operation succeeded
ROW_READY A complete row is available
END_OF_DATA No more data to process
NEED_MORE_DATA Parser needs more input
ERROR Parse error occurred

Memory Efficiency

The streaming parser is designed for minimal memory usage:

  • Only one row is kept in memory at a time
  • Field data uses zero-copy string views where possible
  • Internal buffer grows only as needed
  • Configurable chunk size for I/O (default: 64KB)
  • Configurable maximum field size to prevent memory exhaustion

Memory Usage Comparison

For a 10GB file with 1000 columns:

Parser Memory Usage
Batch Parser ~10GB+ (file + index)
Streaming Parser ~64KB (chunk size) + row buffer

Performance Considerations

  1. Chunk size: Larger chunks reduce I/O overhead but use more memory. Default 64KB is a good balance.

  2. Single-threaded: The streaming parser is single-threaded. For maximum throughput on large files that fit in memory, use the batch parser with multi-threading.

  3. No random access: You can only iterate forward through the file. To process specific rows, you must read through preceding rows.

  4. Header parsing: If you know the file has no header, set parse_header = false to treat the first row as data.

Complete Example

#include <streaming.h>
#include <iostream>
#include <unordered_map>

int main(int argc, char* argv[]) {
    if (argc < 2) {
        std::cerr << "Usage: " << argv[0] << " <file.csv>\n";
        return 1;
    }

    // Configure for tab-separated, strict error handling
    libvroom::StreamConfig config;
    config.dialect = libvroom::Dialect::tsv();
    config.error_mode = libvroom::ErrorMode::FAIL_FAST;

    try {
        libvroom::StreamReader reader(argv[1], config);

        // Aggregate example: count values in first column
        std::unordered_map<std::string, size_t> counts;

        for (const auto& row : reader) {
            counts[row[0].str()]++;
        }

        // Print results
        for (const auto& [value, count] : counts) {
            std::cout << value << ": " << count << "\n";
        }

        std::cout << "\nTotal rows: " << reader.rows_read() << "\n";
        std::cout << "Total bytes: " << reader.bytes_read() << "\n";

        // Check for errors
        if (reader.error_collector().has_errors()) {
            std::cerr << reader.error_collector().summary() << "\n";
            return 1;
        }

    } catch (const std::runtime_error& e) {
        std::cerr << "Error: " << e.what() << "\n";
        return 1;
    }

    return 0;
}

See Also