Əsas məzmuna keçin

RabbitMQ Internals

  • Message Broker: RabbitMQ açıq mənbəli mesaj broker sistemidir.
  • AMQP Protocol: Advanced Message Queuing Protocol 0-9-1 implementasiyası.
  • Reliable Messaging: Mesaj itkilərini önləyən mexanizmlər.
  • Flexible Routing: Exchange və binding vasitəsilə güclü routing.
  • Clustering: Yüksək əlçatanlıq və horizontal genişlənmə.
  • High Availability: Queue mirroring və federation.
  • Push-based Model: Broker mesajları consumer-lərə göndərir (push).
  • Multiple Protocols: AMQP, MQTT, STOMP, HTTP dəstəyi.

Əsas Komponentlər

1. Producer

  • Mesaj göndərən tətbiq
  • Exchange-ə mesaj publish edir
  • Heç vaxt birbaşa queue-yə yazma etmir

2. Exchange

  • Mesajları routing edir
  • Binding rules əsasında queue-lərə yönləndirir
  • 4 növ exchange:
    • Direct: Routing key ilə exact match
    • Fanout: Bütün bound queue-lərə göndərir
    • Topic: Pattern matching (wildcards)
    • Headers: Header attributes əsasında

3. Queue

  • Mesajları saxlayır
  • FIFO (First-In-First-Out) prinsipilə işləyir
  • Consumer-lərə mesaj çatdırır
  • Durable və ya transient ola bilər

4. Binding

  • Exchange və queue arasında əlaqə
  • Routing key və ya pattern təyin edir

5. Consumer

  • Mesaj alan tətbiq
  • Queue-dan mesaj consume edir
  • Acknowledgment (ACK) göndərir

6. Virtual Host (vhost)

  • Logical separation
  • Multi-tenancy üçün
  • Hər vhost ayrı namespace

RabbitMQ Arxitekturası

┌────────────────────────────────────────────────────────────┐
│ RabbitMQ Node │
│ │
│ Producer │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ EXCHANGE │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Direct │ │ Fanout │ │ Topic │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────┘ │
│ │ │ │ │
│ (bindings) (bindings) (bindings) │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────────────────────────────────────────┐ │
│ │ QUEUES │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Queue 1 │ │ Queue 2 │ │ Queue 3 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └────────────────────────────────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ Consumer 1 Consumer 2 Consumer 3 │
│ │
└────────────────────────────────────────────────────────────┘

Exchange Növləri

1. Direct Exchange

Routing key exact match ilə işləyir:

Exchange: "logs"
Binding:
Queue "info-queue" → routing key: "info"
Queue "error-queue" → routing key: "error"

Message (routing key: "error") → error-queue
Message (routing key: "info") → info-queue
# Producer
channel.basic_publish(
exchange='logs',
routing_key='error',
body='Error message'
)

# Consumer
channel.queue_bind(
exchange='logs',
queue='error-queue',
routing_key='error'
)

2. Fanout Exchange

Bütün bound queue-lərə göndərir (broadcast):

Exchange: "notifications"
Binding:
Queue "email-queue"
Queue "sms-queue"
Queue "push-queue"

Message → email-queue, sms-queue, push-queue (hamısına)
# Producer
channel.basic_publish(
exchange='notifications',
routing_key='', # ignored in fanout
body='New notification'
)

# Consumer
channel.queue_bind(
exchange='notifications',
queue='email-queue'
)

3. Topic Exchange

Wildcard pattern matching:

Exchange: "logs"
Patterns:
* (star) → bir söz
# (hash) → sıfır və ya daha çox söz

Bindings:
Queue "critical-logs" → "*.critical.*"
Queue "app-logs" → "app.#"
Queue "all-logs" → "#"

Messages:
"app.critical.database" → critical-logs, app-logs, all-logs
"app.info.api" → app-logs, all-logs
"system.warning" → all-logs
# Producer
channel.basic_publish(
exchange='logs',
routing_key='app.critical.database',
body='Critical error'
)

# Consumer
channel.queue_bind(
exchange='logs',
queue='critical-logs',
routing_key='*.critical.*'
)

4. Headers Exchange

Message headers əsasında routing:

# Producer
channel.basic_publish(
exchange='tasks',
routing_key='',
body='Task data',
properties=pika.BasicProperties(
headers={'format': 'pdf', 'type': 'report'}
)
)

# Consumer
channel.queue_bind(
exchange='tasks',
queue='pdf-queue',
arguments={'format': 'pdf', 'x-match': 'all'}
)

Message Flow

Publishing

1. Producer → Connection açır
2. Producer → Channel yaradır
3. Producer → Exchange-ə mesaj publish edir
4. Exchange → Routing rules əsasında queue-lərə yönləndirir
5. Queue → Mesajı disk/memory-də saxlayır

Consuming

1. Consumer → Connection açır
2. Consumer → Channel yaradır
3. Consumer → Queue-dan mesaj request edir
4. RabbitMQ → Consumer-ə mesaj göndərir
5. Consumer → Mesajı process edir
6. Consumer → ACK göndərir
7. RabbitMQ → Mesajı queue-dan silir

Acknowledgment (ACK)

Manual Acknowledgment

def callback(ch, method, properties, body):
print(f"Received: {body}")

try:
# Process message
process_message(body)

# Success - ACK
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Failure - NACK (requeue)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

channel.basic_consume(
queue='tasks',
on_message_callback=callback,
auto_ack=False # Manual ACK
)

Auto Acknowledgment

channel.basic_consume(
queue='tasks',
on_message_callback=callback,
auto_ack=True # Automatic ACK (risky!)
)

Acknowledgment Növləri

  • basic.ack: Mesaj uğurla process edildi
  • basic.nack: Mesaj reject edildi (requeue ola bilər)
  • basic.reject: Bir mesajı reject et

Message Durability

Durable Queue

# Queue durable olmalıdır
channel.queue_declare(queue='tasks', durable=True)

Persistent Messages

# Mesaj persistent olmalıdır
channel.basic_publish(
exchange='',
routing_key='tasks',
body='Task data',
properties=pika.BasicProperties(
delivery_mode=2 # persistent
)
)

Qeyd: Həm queue, həm də mesaj persistent olmalıdır ki, mesaj itməsin.

Prefetch (QoS)

# Hər consumer maksimum 1 mesaj alsın (ACK alınana qədər)
channel.basic_qos(prefetch_count=1)

# Fair dispatch - işləyən consumer-ə daha çox mesaj
channel.basic_qos(prefetch_count=10)

Fair Dispatch:

Without QoS:
Consumer 1: [msg1, msg3, msg5, msg7, msg9] (ağır mesajlar)
Consumer 2: [msg2, msg4, msg6, msg8, msg10] (yüngül mesajlar)

With QoS (prefetch=1):
Consumer 1: [msg1, msg6, msg7] (balanslanmış)
Consumer 2: [msg2, msg3, msg4, msg5, msg8, msg9, msg10]

Dead Letter Exchange (DLX)

# Main queue with DLX
channel.queue_declare(
queue='tasks',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed',
'x-message-ttl': 60000 # 60 seconds
}
)

# Dead letter queue
channel.queue_declare(queue='failed-tasks', durable=True)
channel.queue_bind(
exchange='dlx',
queue='failed-tasks',
routing_key='failed'
)

Dead Letter halları:

  • Mesaj reject edildi (requeue=false)
  • TTL expired
  • Queue length limit exceeded

TTL (Time To Live)

Message TTL

# Per-message TTL
channel.basic_publish(
exchange='',
routing_key='tasks',
body='Task data',
properties=pika.BasicProperties(
expiration='60000' # 60 seconds
)
)

Queue TTL

# Bütün mesajlar üçün
channel.queue_declare(
queue='tasks',
arguments={'x-message-ttl': 60000}
)

Queue Expiration

# Queue-un özü expire olur (istifadə olunmazsa)
channel.queue_declare(
queue='temp-queue',
arguments={'x-expires': 300000} # 5 minutes
)

Priority Queues

# Priority queue yarat (0-9 arası)
channel.queue_declare(
queue='priority-tasks',
arguments={'x-max-priority': 10}
)

# Yüksək prioritetli mesaj
channel.basic_publish(
exchange='',
routing_key='priority-tasks',
body='Urgent task',
properties=pika.BasicProperties(priority=9)
)

# Aşağı prioritetli mesaj
channel.basic_publish(
exchange='',
routing_key='priority-tasks',
body='Normal task',
properties=pika.BasicProperties(priority=1)
)

Clustering

Cluster Növləri

1. Regular Cluster:

  • Metadata paylaşılır
  • Queue-lar bir node-da saxlanılır
  • High availability üçün queue mirroring lazımdır

2. Federation:

  • Müxtəlif datacenter-lər arası
  • Loose coupling
  • Message replication

3. Shovel:

  • Queue-lar arası message transfer
  • One-way və ya two-way

Queue Mirroring

# Policy yarat
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

# 2 replica
rabbitmqctl set_policy ha-two "^" '{"ha-mode":"exactly","ha-params":2}'

# Automatic sync
rabbitmqctl set_policy ha-sync "^" \
'{"ha-mode":"all","ha-sync-mode":"automatic"}'

Cluster Setup

# Node 1
rabbitmq-server -detached

# Node 2
rabbitmq-server -detached
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

# Node 3
rabbitmq-server -detached
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

Flow Control

Memory-based Flow Control

# rabbitmq.conf
vm_memory_high_watermark.relative = 0.4 # 40% RAM
vm_memory_high_watermark_paging_ratio = 0.75

Disk-based Flow Control

disk_free_limit.absolute = 50GB
disk_free_limit.relative = 2.0 # 2x RAM

Flow Control Mexanizmi:

1. Memory threshold keçildi
2. RabbitMQ connection-ları block edir
3. Publisher mesaj göndərə bilmir
4. Memory azalır
5. Connection-lar unblock olur

Lazy Queues

# Lazy queue - mesajlar diskdə saxlanılır
channel.queue_declare(
queue='large-queue',
arguments={'x-queue-mode': 'lazy'}
)

Normal vs Lazy Queue:

  • Normal: Memory-first, sonra disk
  • Lazy: Disk-first, yalnız consume zamanı memory

Performance Optimization

Connection Pooling

# Çox connection açmaq əvəzinə pool istifadə et
connection_pool = []
for i in range(10):
connection_pool.append(pika.BlockingConnection())

Channel per Thread

# Hər thread üçün ayrı channel
import threading

thread_local = threading.local()

def get_channel():
if not hasattr(thread_local, 'channel'):
connection = pika.BlockingConnection()
thread_local.channel = connection.channel()
return thread_local.channel

Batch Publishing

# Bir-bir əvəzinə batch publish
messages = [...]
for msg in messages:
channel.basic_publish(
exchange='',
routing_key='queue',
body=msg
)
# Batching transaction ilə daha sürətlidir

Prefetch Tuning

# CPU-bound: aşağı prefetch
channel.basic_qos(prefetch_count=1)

# I/O-bound: yüksək prefetch
channel.basic_qos(prefetch_count=50)

Monitoring

Management Plugin

# Enable management plugin
rabbitmq-plugins enable rabbitmq_management

# Access: http://localhost:15672
# Default: guest/guest

Key Metrics

  • Queue depth: Queue-da neçə mesaj var
  • Message rate: Publish/consume rate
  • Consumer count: Aktiv consumer sayı
  • Memory usage: Node memory istifadəsi
  • Connection count: Aktiv connection sayı
  • Channel count: Aktiv channel sayı

CLI Commands

# Queue status
rabbitmqctl list_queues name messages consumers

# Exchange-lər
rabbitmqctl list_exchanges

# Bindings
rabbitmqctl list_bindings

# Connections
rabbitmqctl list_connections

# Memory usage
rabbitmqctl status

Security

User Management

# User yarat
rabbitmqctl add_user myuser mypassword

# Admin rol ver
rabbitmqctl set_user_tags myuser administrator

# Permissions
rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"

SSL/TLS

# rabbitmq.conf
listeners.ssl.default = 5671
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true

Best Practices

  1. Durable Queues: Production-da həmişə durable queue istifadə edin
  2. Persistent Messages: Kritik mesajları persistent edin
  3. Manual ACK: Auto-ack əvəzinə manual ACK istifadə edin
  4. Prefetch: Əlverişli prefetch dəyəri seçin
  5. Connection Pooling: Az connection, çox channel
  6. DLX: Uğursuz mesajlar üçün dead letter queue
  7. Monitoring: Management plugin və alerting
  8. TTL: Köhnə mesajlar üçün TTL təyin edin
  9. Clustering: High availability üçün cluster qurun
  10. Lazy Queues: Böyük queue-lar üçün lazy mode

Common Patterns

Work Queues (Task Queue)

# Multiple workers processing tasks
for i in range(workers):
channel.basic_consume(queue='tasks', on_message_callback=callback)

Publish/Subscribe (Fanout)

# Broadcast to all consumers
channel.exchange_declare(exchange='logs', exchange_type='fanout')

Routing (Direct)

# Route by severity level
channel.exchange_declare(exchange='logs', exchange_type='direct')

Topics (Topic)

# Complex routing patterns
channel.exchange_declare(exchange='logs', exchange_type='topic')

RPC (Request/Reply)

# Synchronous request-response
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(reply_to=callback_queue),
body=request
)

Troubleshooting

High Memory Usage

  • Lazy queues istifadə edin
  • Message TTL təyin edin
  • Consumer sayını artırın
  • Prefetch azaldın

Slow Consumers

  • Prefetch artırın
  • Consumer sayını artırın
  • Processing optimallaşdırın
  • Multiple threads istifadə edin

Connection Issues

  • Connection timeout artırın
  • Heartbeat konfiqurasiyası
  • Network stability yoxlayın
  • Firewall settings

Message Loss

  • Durable queues istifadə edin
  • Persistent messages
  • Publisher confirms
  • Manual ACK
  • Clustering + mirroring

Əlavə Resurslar