1.2.7 Real-Time Data Processing

Process and Transform Data in Near Real Time

Kinesis Data Streams → Lambda

ConfigMô tảRange
Batch SizeSố records per batch1 → 10,000
Batch WindowThời gian chờ đủ batch0 → 300s
Parallelization FactorConcurrent batches per shard1 → 10
Starting PositionBắt đầu đọc từ đâuTRIM_HORIZON / LATEST / AT_TIMESTAMP
def handler(event, context):
    for record in event['Records']:
        # Kinesis data is base64 encoded
        import base64
        payload = base64.b64decode(record['kinesis']['data'])
        data = json.loads(payload)
        process(data)

DynamoDB Streams → Lambda

Stream View TypeNội dung
KEYS_ONLYChỉ partition key và sort key
NEW_IMAGEItem sau khi thay đổi
OLD_IMAGEItem trước khi thay đổi
NEW_AND_OLD_IMAGESCả trước và sau
def handler(event, context):
    for record in event['Records']:
        event_name = record['eventName']  # INSERT, MODIFY, REMOVE
        if event_name == 'INSERT':
            new_item = record['dynamodb']['NewImage']
            # Process new item
        elif event_name == 'MODIFY':
            old_item = record['dynamodb']['OldImage']
            new_item = record['dynamodb']['NewImage']
            # Compare changes

SQS → Lambda

ConfigMô tảRange
Batch SizeSố messages per batch1 → 10 (Standard), 1 → 10 (FIFO)
Batch WindowThời gian chờ đủ batch0 → 300s
Max ConcurrencyGiới hạn concurrent executions2 → 1000

Tumbling Windows (Kinesis & DynamoDB Streams)

  • Aggregate data trong time window (15s → 15 min)
  • State được maintain giữa các invocations
  • Use case: running totals, averages, counts
def handler(event, context):
    state = event.get('state', {})
    count = state.get('count', 0)

    for record in event['Records']:
        count += 1

    state['count'] = count

    if event.get('isFinalInvokeForWindow'):
        # Window kết thúc, emit result
        save_aggregate(count)

    return {'state': state}

Enhanced Fan-Out (Kinesis)

  • Dedicated 2MB/s throughput per consumer
  • Push model (SubscribeToShard) thay vì pull (GetRecords)
  • Giảm latency ~70ms vs ~200ms shared

Exam Tip: Parallelization Factor tăng throughput per shard. Tumbling Windows cho aggregation. Enhanced Fan-Out khi cần multiple consumers với dedicated throughput. DynamoDB Streams + Lambda là pattern phổ biến cho CDC (Change Data Capture).