DownloadATON Streaming Encoder Guide
Overview
The Streaming Encoder processes large datasets in manageable chunks, enabling:
-
Memory-efficient processing of large datasets
-
Progressive data transmission
-
Resumable encoding operations
-
Real-time progress tracking
Basic Usage
use Aton\StreamEncoder;
use Aton\Enums\CompressionMode;
$encoder = new StreamEncoder(
chunkSize: 100,
compression: CompressionMode::BALANCED
);
$data = [
'records' => [
// ... thousands of records
]
];
foreach ($encoder->streamEncode($data) as $chunk) {
// Process each chunk
sendToAPI($chunk['data']);
echo "Progress: " . ($chunk['metadata']['progress'] * 100) . "%\n";
}
Configuration
Chunk Size
Number of records per chunk:
// Small chunks for real-time streaming
$encoder = new StreamEncoder(chunkSize: 10);
// Large chunks for batch processing
$encoder = new StreamEncoder(chunkSize: 1000);
Compression Mode
use Aton\Enums\CompressionMode;
// Fast for real-time
$encoder = new StreamEncoder(compression: CompressionMode::FAST);
// Balanced for general use
$encoder = new StreamEncoder(compression: CompressionMode::BALANCED);
// Ultra for maximum compression
$encoder = new StreamEncoder(compression: CompressionMode::ULTRA);
Chunk Structure
Each yielded chunk contains:
[
'chunkId' => 0, // 0-indexed chunk number
'totalChunks' => 10, // Total number of chunks
'data' => '...', // ATON-encoded string
'isFirst' => true, // True for first chunk
'isLast' => false, // True for last chunk
'metadata' => [
'table' => 'records',
'recordsInChunk' => 100,
'startIdx' => 0,
'endIdx' => 100,
'totalRecords' => 1000,
'progress' => 0.1 // 0.0 to 1.0
]
]
First Chunk Format
The first chunk contains the complete schema:
@schema[id:int, name:str, value:float]
@defaults[value:0.0]
records(100):
1, "First", 10.5
2, "Second", 20.3
...
Continuation Chunks
Subsequent chunks use continuation syntax:
records+(100):
101, "Next", 30.7
102, "Another", 40.2
...
Multi-Table Support
When data contains multiple tables, specify which to stream:
$data = [
'users' => [...],
'orders' => [...],
];
// Stream specific table
foreach ($encoder->streamEncode($data, 'users') as $chunk) {
// ...
}
Without specifying, an exception is thrown for multi-table data.
Progress Tracking
foreach ($encoder->streamEncode($data) as $chunk) {
$progress = $chunk['metadata']['progress'] * 100;
$current = $chunk['chunkId'] + 1;
$total = $chunk['totalChunks'];
echo "Chunk $current/$total - $progress% complete\n";
if ($chunk['isFirst']) {
echo "Starting transmission...\n";
}
if ($chunk['isLast']) {
echo "Transmission complete!\n";
}
}
Use Cases
API Transmission
foreach ($encoder->streamEncode($largeData) as $chunk) {
$response = $httpClient->post('/api/data', [
'body' => $chunk['data'],
'headers' => [
'X-Chunk-Id' => $chunk['chunkId'],
'X-Total-Chunks' => $chunk['totalChunks'],
'X-Is-Last' => $chunk['isLast'] ? 'true' : 'false'
]
]);
if (!$response->isSuccess()) {
throw new Exception("Failed at chunk {$chunk['chunkId']}");
}
}
File Writing
$handle = fopen('output.aton', 'w');
foreach ($encoder->streamEncode($data) as $chunk) {
fwrite($handle, $chunk['data']);
if (!$chunk['isLast']) {
fwrite($handle, "\n");
}
}
fclose($handle);
WebSocket Streaming
foreach ($encoder->streamEncode($data) as $chunk) {
$websocket->send(json_encode([
'type' => 'data_chunk',
'chunkId' => $chunk['chunkId'],
'totalChunks' => $chunk['totalChunks'],
'data' => $chunk['data'],
'progress' => $chunk['metadata']['progress']
]));
// Allow client to process
usleep(10000); // 10ms
}
Database Export
$pdo = new PDO($dsn);
$stmt = $pdo->query("SELECT * FROM large_table");
// Build data array in chunks
$records = [];
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
$records[] = $row;
if (count($records) >= 10000) {
$data = ['records' => $records];
foreach ($encoder->streamEncode($data) as $chunk) {
processChunk($chunk);
}
$records = [];
}
}
// Process remaining
if (!empty($records)) {
$data = ['records' => $records];
foreach ($encoder->streamEncode($data) as $chunk) {
processChunk($chunk);
}
}
Error Handling
use Aton\Exceptions\ATONEncodingException;
try {
foreach ($encoder->streamEncode($data) as $chunk) {
try {
processChunk($chunk);
} catch (Exception $e) {
// Log and continue or rethrow
error_log("Failed chunk {$chunk['chunkId']}: " . $e->getMessage());
throw $e;
}
}
} catch (ATONEncodingException $e) {
echo "Encoding error: " . $e->getMessage();
}
Memory Considerations
The streaming encoder is memory-efficient because:
-
Only one chunk is in memory at a time
-
Uses PHP generators (yield)
-
Original data is sliced, not copied
However, the original data array must fit in memory. For truly massive datasets, consider:
-
Database cursors
-
File streaming
-
Batch processing with multiple streamEncode calls
Best Practices
-
Choose appropriate chunk size
- Smaller for real-time (10-100)
- Larger for batch (500-1000)
-
Handle failures gracefully
- Track processed chunks
- Implement retry logic
- Log progress for recovery
-
Monitor memory usage
- Use generators throughout pipeline
- Clear references after processing
-
Consider compression
- FAST for real-time
- BALANCED for network transmission
- ULTRA for storage
|