Əsas məzmuna keçin

Kafka Internals

  • Distributed Event Streaming: Kafka paylanmış event streaming platformasıdır.
  • High Throughput: Saniyədə milyonlarla mesaj işləyə bilir.
  • Horizontal Scalability: Broker əlavə edərək asanlıqla genişlənə bilir.
  • Fault Tolerance: Replication vasitəsilə yüksək əlçatanlıq təmin edir.
  • Durability: Mesajları diskə yazır və saxlayır.
  • Low Latency: Millisaniyə səviyyəsində gecikməylə işləyir.
  • Distributed Architecture: Cluster şəklində işləyir.
  • Pull-based Model: Consumer-lər mesajları özləri çəkir (pull).

Əsas Komponentlər

1. Broker

  • Kafka server instance
  • Mesajları saxlayır və idarə edir
  • Cluster-də bir və ya bir neçə broker ola bilər
  • Hər broker unikal ID ilə identifikasiya olunur
  • Leader və Follower rolları

2. Topic

  • Mesajların kateqoriyası
  • Logical məlumat axını
  • Partitionlara bölünür
  • Retention policy ilə idarə olunur

3. Partition

  • Topic-in fiziki bölməsi
  • Sıralı, immutable mesaj ardıcıllığı
  • Hər mesajın offset-i var
  • Parallelizm üçün istifadə olunur

4. Producer

  • Mesaj göndərən tətbiq
  • Topic-ə mesaj yazır
  • Partitioning strategiyası seçir

5. Consumer

  • Mesaj oxuyan tətbiq
  • Consumer group-a aid ola bilər
  • Offset-i özü idarə edir

6. Zookeeper (və ya KRaft)

  • Cluster koordinasiyası
  • Leader election
  • Configuration management
  • (Kafka 3.0+ KRaft ilə Zookeeper-siz işləyə bilir)

Kafka Arxitekturası

┌─────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ │ │ │ │ │ │ │
│ │ Topic A P0 │ │ Topic A P1 │ │ Topic A P2 │ │
│ │ Topic B P0 │ │ Topic B P1 │ │ Topic A P0* │ │
│ │ Topic A P1* │ │ Topic A P2* │ │ Topic B P0* │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ └─────────────────┴─────────────────┘ │
│ │ │
│ ┌───────────────┐ │
│ │ Zookeeper │ │
│ │ Ensemble │ │
│ └───────────────┘ │
└─────────────────────────────────────────────────────────┘
▲ │
│ ▼
Producers Consumers

P0, P1, P2 = Partitions, P0 = Replica

Partition və Replication

Partition Strukturu

Topic: orders (3 partitions)

Partition 0: [msg0] [msg3] [msg6] [msg9] ...
offset: 0 1 2 3

Partition 1: [msg1] [msg4] [msg7] [msg10] ...
offset: 0 1 2 3

Partition 2: [msg2] [msg5] [msg8] [msg11] ...
offset: 0 1 2 3

Replication

  • Hər partition bir neçə broker-də replicate edilir
  • Replication Factor: Replica sayı (məsələn, 3)
  • Leader Partition: Write və read əməliyyatları üçün
  • Follower Partitions: Leader-dən məlumatı kopyalayır
  • ISR (In-Sync Replicas): Leader ilə sinxron replika-lar
Broker 1: [P0-Leader]  [P1-Follower] [P2-Follower]
Broker 2: [P0-Follower] [P1-Leader] [P2-Follower]
Broker 3: [P0-Follower] [P1-Follower] [P2-Leader]

Producer Internals

Mesaj Göndərmə Prosesi

Producer → Serializer → Partitioner → Batch → Broker

Partitioner

  1. Key-based: Key-in hash-ına görə partition seçimi
  2. Round-robin: Key yoxdursa, növbə ilə
  3. Custom: Öz strategiyanız
// Key-based partitioning
producer.send(new ProducerRecord<>("topic", key, value));

// Custom partitioner
class CustomPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// Öz məntiqiniz
return partitionNumber;
}
}

Acknowledgment (acks)

  • acks=0: Producer cavab gözləmir (ən sürətli, ən az etibarlı)
  • acks=1: Leader yazana qədər gözləyir
  • acks=all: Bütün ISR-lər yazana qədər gözləyir (ən yavaş, ən etibarlı)

Batching

  • Producer mesajları batch edir
  • batch.size: Batch ölçüsü (bytes)
  • linger.ms: Batch-i göndərmək üçün gözləmə müddəti
  • Performance artırır, latency-ni bir qədər artırır

Compression

  • compression.type: none, gzip, snappy, lz4, zstd
  • Network və disk istifadəsini azaldır
  • CPU istifadəsini artırır

Consumer Internals

Consumer Group

  • Bir neçə consumer eyni topic-dən oxuyur
  • Hər partition yalnız group-dakı bir consumer tərəfindən oxunur
  • Load balancing və fault tolerance
Topic: orders (3 partitions)

Consumer Group A:
Consumer 1: P0, P1
Consumer 2: P2

Consumer Group B:
Consumer 3: P0
Consumer 4: P1
Consumer 5: P2

Rebalancing

Consumer əlavə və ya çıxarılanda partition yenidən paylanır:

Before:
Consumer 1: P0, P1, P2

After (Consumer 2 əlavə olundu):
Consumer 1: P0, P1
Consumer 2: P2

Rebalance növləri:

  • Eager Rebalance: Bütün consumer-lər partition-larını buraxır (Stop-the-world)
  • Cooperative Rebalance: Yalnız lazım olan partition-lar transfer olunur

Offset Management

  • Hər consumer offset-i commit edir
  • __consumer_offsets topic-ində saxlanılır
  • Auto-commit: Avtomatik commit (enable.auto.commit=true)
  • Manual commit: Manual idarəetmə (commitSync/commitAsync)
// Auto commit
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");

// Manual commit
consumer.commitSync(); // Synchronous
consumer.commitAsync(); // Asynchronous

Offset Reset Strategy

  • earliest: Topic-in əvvəlindən oxu
  • latest: Yalnız yeni mesajları oxu
  • none: Offset yoxdursa exception at

Log Structure

Segment Files

Partition log segment-lərə bölünür:

/var/kafka-logs/topic-0/
00000000000000000000.log (0-999 offset)
00000000000000001000.log (1000-1999 offset)
00000000000000002000.log (2000-2999 offset)
00000000000000000000.index
00000000000000001000.index
00000000000000002000.index

Log Retention

  • Time-based: log.retention.ms, log.retention.hours
  • Size-based: log.retention.bytes
  • Compaction: log.cleanup.policy=compact

Log Compaction

Before compaction:
offset: 0 1 2 3 4 5 6
key: A B C A B D C
value: v1 v2 v3 v4 v5 v6 v7

After compaction (hər key-in son value-si saxlanılır):
offset: 3 4 5 6
key: A B D C
value: v4 v5 v6 v7

Message Format

Message Structure

┌──────────────────────────────────────────┐
│ Offset (8 bytes) │
├──────────────────────────────────────────┤
│ Message Size (4 bytes) │
├──────────────────────────────────────────┤
│ CRC (4 bytes) │
├──────────────────────────────────────────┤
│ Magic Byte (1 byte) │
├──────────────────────────────────────────┤
│ Attributes (1 byte) │
├──────────────────────────────────────────┤
│ Timestamp (8 bytes) │
├──────────────────────────────────────────┤
│ Key Length (4 bytes) │
├──────────────────────────────────────────┤
│ Key (variable) │
├──────────────────────────────────────────┤
│ Value Length (4 bytes) │
├──────────────────────────────────────────┤
│ Value (variable) │
├──────────────────────────────────────────┤
│ Headers (variable) │
└──────────────────────────────────────────┘

Leader Election

Controller

  • Cluster-də bir broker controller rolunu alır
  • Leader election-u idarə edir
  • Partition və replica-ları idarə edir

Leader Election Prosesi

  1. Leader partition down olur
  2. Controller bunu detect edir
  3. ISR-dən yeni leader seçilir
  4. Yeni leader metadata yenilənir
  5. Consumer və Producer-lər yeni leader-ə yönləndirilir

Performance Optimizations

Zero-Copy

  • OS-level optimization
  • Disk-dən birbaşa network-ə transfer
  • CPU overhead-i azaldır

Sequential I/O

  • Diskə sequential yazma
  • Random I/O-dan qat-qat sürətli
  • HDD-də belə yüksək throughput

Page Cache

  • OS page cache-dən istifadə
  • Tez-tez oxunan məlumatlar memory-də
  • Kafka öz cache-ni idarə etmir

Batching

  • Producer və Consumer batch işləyir
  • Network round-trip-lərini azaldır
  • Throughput-u artırır

Configuration Parameters

Broker Configuration

# Broker ID
broker.id=1

# Log directory
log.dirs=/var/kafka-logs

# Zookeeper
zookeeper.connect=localhost:2181

# Replication
default.replication.factor=3
min.insync.replicas=2

# Retention
log.retention.hours=168 # 7 days
log.segment.bytes=1073741824 # 1GB

# Performance
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400

Producer Configuration

# Acknowledgment
acks=all

# Retries
retries=3
retry.backoff.ms=100

# Batching
batch.size=16384
linger.ms=10

# Compression
compression.type=snappy

# Idempotence
enable.idempotence=true

Consumer Configuration

# Group
group.id=my-consumer-group

# Offset
enable.auto.commit=true
auto.commit.interval.ms=1000
auto.offset.reset=earliest

# Fetch
fetch.min.bytes=1
fetch.max.wait.ms=500
max.poll.records=500

Exactly-Once Semantics (EOS)

Idempotent Producer

props.put("enable.idempotence", "true");
  • Duplicate mesajları önləyir
  • Producer ID və sequence number istifadə edir

Transactional Producer

props.put("transactional.id", "my-transactional-id");

producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}

Transactional Consumer

props.put("isolation.level", "read_committed");
  • Yalnız commit edilmiş mesajları oxuyur

Monitoring Metrics

Broker Metrics

  • UnderReplicatedPartitions: ISR-də olmayan partition-lar
  • OfflinePartitionsCount: Leader-i olmayan partition-lar
  • ActiveControllerCount: Controller sayı (1 olmalı)
  • RequestsPerSecond: Saniyədə request sayı

Producer Metrics

  • record-send-rate: Göndərilən mesaj rate-i
  • record-error-rate: Error rate-i
  • request-latency-avg: Orta latency
  • compression-rate-avg: Compression rate-i

Consumer Metrics

  • records-consumed-rate: Oxunan mesaj rate-i
  • records-lag-max: Maksimum lag
  • fetch-latency-avg: Orta fetch latency
  • commit-latency-avg: Orta commit latency

Best Practices

  1. Partition Sayı: CPU core sayına əsasən (topic throughput / target throughput per partition)
  2. Replication Factor: 3 (production üçün minimum)
  3. min.insync.replicas: 2 (RF=3 olduqda)
  4. acks=all: Kritik məlumat üçün
  5. Compression: snappy və ya lz4 istifadə edin
  6. Batching: linger.ms və batch.size optimize edin
  7. Monitoring: Metrics və alerting quraşdırın
  8. Retention: Ehtiyacınıza görə retention policy təyin edin
  9. Consumer Groups: Parallelizm üçün partition sayı = consumer sayı
  10. Security: SASL/SSL aktiv edin

Common Issues

1. Consumer Lag

  • Partition sayını artırın
  • Consumer sayını artırın
  • Processing məntiqini optimize edin

2. Rebalancing

  • session.timeout.ms artırın
  • max.poll.interval.ms artırın
  • Cooperative rebalance istifadə edin

3. Data Loss

  • acks=all istifadə edin
  • min.insync.replicas≥2 təyin edin
  • Unclean leader election-u disable edin

4. Low Throughput

  • Batching optimize edin
  • Compression aktiv edin
  • Partition sayını artırın

Əlavə Resurslar