PHP Classes

File: docs/STREAMING.md

Recommend this page to a friend!
  Packages of Stefano D'Agostino   ATON Format PHP   docs/STREAMING.md   Download  
File: docs/STREAMING.md
Role: Auxiliary data
Content type: text/markdown
Description: Auxiliary data
Class: ATON Format PHP
Encode and decode values using the ATON format
Author: By
Last change:
Date: 3 months ago
Size: 6,028 bytes
 

Contents

Class file image Download

ATON Streaming Encoder Guide

Overview

The Streaming Encoder processes large datasets in manageable chunks, enabling:

  • Memory-efficient processing of large datasets
  • Progressive data transmission
  • Resumable encoding operations
  • Real-time progress tracking

Basic Usage

use Aton\StreamEncoder;
use Aton\Enums\CompressionMode;

$encoder = new StreamEncoder(
    chunkSize: 100,
    compression: CompressionMode::BALANCED
);

$data = [
    'records' => [
        // ... thousands of records
    ]
];

foreach ($encoder->streamEncode($data) as $chunk) {
    // Process each chunk
    sendToAPI($chunk['data']);

    echo "Progress: " . ($chunk['metadata']['progress'] * 100) . "%\n";
}

Configuration

Chunk Size

Number of records per chunk:

// Small chunks for real-time streaming
$encoder = new StreamEncoder(chunkSize: 10);

// Large chunks for batch processing
$encoder = new StreamEncoder(chunkSize: 1000);

Compression Mode

use Aton\Enums\CompressionMode;

// Fast for real-time
$encoder = new StreamEncoder(compression: CompressionMode::FAST);

// Balanced for general use
$encoder = new StreamEncoder(compression: CompressionMode::BALANCED);

// Ultra for maximum compression
$encoder = new StreamEncoder(compression: CompressionMode::ULTRA);

Chunk Structure

Each yielded chunk contains:

[
    'chunkId' => 0,           // 0-indexed chunk number
    'totalChunks' => 10,      // Total number of chunks
    'data' => '...',          // ATON-encoded string
    'isFirst' => true,        // True for first chunk
    'isLast' => false,        // True for last chunk
    'metadata' => [
        'table' => 'records',
        'recordsInChunk' => 100,
        'startIdx' => 0,
        'endIdx' => 100,
        'totalRecords' => 1000,
        'progress' => 0.1     // 0.0 to 1.0
    ]
]

First Chunk Format

The first chunk contains the complete schema:

@schema[id:int, name:str, value:float]
@defaults[value:0.0]

records(100):
  1, "First", 10.5
  2, "Second", 20.3
  ...

Continuation Chunks

Subsequent chunks use continuation syntax:

records+(100):
  101, "Next", 30.7
  102, "Another", 40.2
  ...

Multi-Table Support

When data contains multiple tables, specify which to stream:

$data = [
    'users' => [...],
    'orders' => [...],
];

// Stream specific table
foreach ($encoder->streamEncode($data, 'users') as $chunk) {
    // ...
}

Without specifying, an exception is thrown for multi-table data.

Progress Tracking

foreach ($encoder->streamEncode($data) as $chunk) {
    $progress = $chunk['metadata']['progress'] * 100;
    $current = $chunk['chunkId'] + 1;
    $total = $chunk['totalChunks'];

    echo "Chunk $current/$total - $progress% complete\n";

    if ($chunk['isFirst']) {
        echo "Starting transmission...\n";
    }

    if ($chunk['isLast']) {
        echo "Transmission complete!\n";
    }
}

Use Cases

API Transmission

foreach ($encoder->streamEncode($largeData) as $chunk) {
    $response = $httpClient->post('/api/data', [
        'body' => $chunk['data'],
        'headers' => [
            'X-Chunk-Id' => $chunk['chunkId'],
            'X-Total-Chunks' => $chunk['totalChunks'],
            'X-Is-Last' => $chunk['isLast'] ? 'true' : 'false'
        ]
    ]);

    if (!$response->isSuccess()) {
        throw new Exception("Failed at chunk {$chunk['chunkId']}");
    }
}

File Writing

$handle = fopen('output.aton', 'w');

foreach ($encoder->streamEncode($data) as $chunk) {
    fwrite($handle, $chunk['data']);
    if (!$chunk['isLast']) {
        fwrite($handle, "\n");
    }
}

fclose($handle);

WebSocket Streaming

foreach ($encoder->streamEncode($data) as $chunk) {
    $websocket->send(json_encode([
        'type' => 'data_chunk',
        'chunkId' => $chunk['chunkId'],
        'totalChunks' => $chunk['totalChunks'],
        'data' => $chunk['data'],
        'progress' => $chunk['metadata']['progress']
    ]));

    // Allow client to process
    usleep(10000); // 10ms
}

Database Export

$pdo = new PDO($dsn);
$stmt = $pdo->query("SELECT * FROM large_table");

// Build data array in chunks
$records = [];
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
    $records[] = $row;

    if (count($records) >= 10000) {
        $data = ['records' => $records];

        foreach ($encoder->streamEncode($data) as $chunk) {
            processChunk($chunk);
        }

        $records = [];
    }
}

// Process remaining
if (!empty($records)) {
    $data = ['records' => $records];
    foreach ($encoder->streamEncode($data) as $chunk) {
        processChunk($chunk);
    }
}

Error Handling

use Aton\Exceptions\ATONEncodingException;

try {
    foreach ($encoder->streamEncode($data) as $chunk) {
        try {
            processChunk($chunk);
        } catch (Exception $e) {
            // Log and continue or rethrow
            error_log("Failed chunk {$chunk['chunkId']}: " . $e->getMessage());
            throw $e;
        }
    }
} catch (ATONEncodingException $e) {
    echo "Encoding error: " . $e->getMessage();
}

Memory Considerations

The streaming encoder is memory-efficient because:

  1. Only one chunk is in memory at a time
  2. Uses PHP generators (yield)
  3. Original data is sliced, not copied

However, the original data array must fit in memory. For truly massive datasets, consider:

  1. Database cursors
  2. File streaming
  3. Batch processing with multiple streamEncode calls

Best Practices

  1. Choose appropriate chunk size - Smaller for real-time (10-100) - Larger for batch (500-1000)
  2. Handle failures gracefully - Track processed chunks - Implement retry logic - Log progress for recovery
  3. Monitor memory usage - Use generators throughout pipeline - Clear references after processing
  4. Consider compression - FAST for real-time - BALANCED for network transmission - ULTRA for storage