RabbitMQ Internals
- Message Broker: RabbitMQ açıq mənbəli mesaj broker sistemidir.
- AMQP Protocol: Advanced Message Queuing Protocol 0-9-1 implementasiyası.
- Reliable Messaging: Mesaj itkilərini önləyən mexanizmlər.
- Flexible Routing: Exchange və binding vasitəsilə güclü routing.
- Clustering: Yüksək əlçatanlıq və horizontal genişlənmə.
- High Availability: Queue mirroring və federation.
- Push-based Model: Broker mesajları consumer-lərə göndərir (push).
- Multiple Protocols: AMQP, MQTT, STOMP, HTTP dəstəyi.
Əsas Komponentlər
1. Producer
- Mesaj göndərən tətbiq
- Exchange-ə mesaj publish edir
- Heç vaxt birbaşa queue-yə yazma etmir
2. Exchange
- Mesajları routing edir
- Binding rules əsasında queue-lərə yönləndirir
- 4 növ exchange:
- Direct: Routing key ilə exact match
- Fanout: Bütün bound queue-lərə göndərir
- Topic: Pattern matching (wildcards)
- Headers: Header attributes əsasında
3. Queue
- Mesajları saxlayır
- FIFO (First-In-First-Out) prinsipilə işləyir
- Consumer-lərə mesaj çatdırır
- Durable və ya transient ola bilər
4. Binding
- Exchange və queue arasında əlaqə
- Routing key və ya pattern təyin edir
5. Consumer
- Mesaj alan tətbiq
- Queue-dan mesaj consume edir
- Acknowledgment (ACK) göndərir
6. Virtual Host (vhost)
- Logical separation
- Multi-tenancy üçün
- Hər vhost ayrı namespace
RabbitMQ Arxitekturası
┌────────────────────────────────────────────────────────────┐
│ RabbitMQ Node │
│ │
│ Producer │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ EXCHANGE │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Direct │ │ Fanout │ │ Topic │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────┘ │
│ │ │ │ │
│ (bindings) (bindings) (bindings) │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────────────────────────────────────────┐ │
│ │ QUEUES │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Queue 1 │ │ Queue 2 │ │ Queue 3 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └────────────────────────────────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ Consumer 1 Consumer 2 Consumer 3 │
│ │
└────────────────────────────────────────────────────────────┘
Exchange Növləri
1. Direct Exchange
Routing key exact match ilə işləyir:
Exchange: "logs"
Binding:
Queue "info-queue" → routing key: "info"
Queue "error-queue" → routing key: "error"
Message (routing key: "error") → error-queue
Message (routing key: "info") → info-queue
# Producer
channel.basic_publish(
exchange='logs',
routing_key='error',
body='Error message'
)
# Consumer
channel.queue_bind(
exchange='logs',
queue='error-queue',
routing_key='error'
)
2. Fanout Exchange
Bütün bound queue-lərə göndərir (broadcast):
Exchange: "notifications"
Binding:
Queue "email-queue"
Queue "sms-queue"
Queue "push-queue"
Message → email-queue, sms-queue, push-queue (hamısına)
# Producer
channel.basic_publish(
exchange='notifications',
routing_key='', # ignored in fanout
body='New notification'
)
# Consumer
channel.queue_bind(
exchange='notifications',
queue='email-queue'
)
3. Topic Exchange
Wildcard pattern matching:
Exchange: "logs"
Patterns:
* (star) → bir söz
# (hash) → sıfır və ya daha çox söz
Bindings:
Queue "critical-logs" → "*.critical.*"
Queue "app-logs" → "app.#"
Queue "all-logs" → "#"
Messages:
"app.critical.database" → critical-logs, app-logs, all-logs
"app.info.api" → app-logs, all-logs
"system.warning" → all-logs
# Producer
channel.basic_publish(
exchange='logs',
routing_key='app.critical.database',
body='Critical error'
)
# Consumer
channel.queue_bind(
exchange='logs',
queue='critical-logs',
routing_key='*.critical.*'
)
4. Headers Exchange
Message headers əsasında routing:
# Producer
channel.basic_publish(
exchange='tasks',
routing_key='',
body='Task data',
properties=pika.BasicProperties(
headers={'format': 'pdf', 'type': 'report'}
)
)
# Consumer
channel.queue_bind(
exchange='tasks',
queue='pdf-queue',
arguments={'format': 'pdf', 'x-match': 'all'}
)
Message Flow
Publishing
1. Producer → Connection açır
2. Producer → Channel yaradır
3. Producer → Exchange-ə mesaj publish edir
4. Exchange → Routing rules əsasında queue-lərə yönləndirir
5. Queue → Mesajı disk/memory-də saxlayır
Consuming
1. Consumer → Connection açır
2. Consumer → Channel yaradır
3. Consumer → Queue-dan mesaj request edir
4. RabbitMQ → Consumer-ə mesaj göndərir
5. Consumer → Mesajı process edir
6. Consumer → ACK göndərir
7. RabbitMQ → Mesajı queue-dan silir
Acknowledgment (ACK)
Manual Acknowledgment
def callback(ch, method, properties, body):
print(f"Received: {body}")
try:
# Process message
process_message(body)
# Success - ACK
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Failure - NACK (requeue)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_consume(
queue='tasks',
on_message_callback=callback,
auto_ack=False # Manual ACK
)
Auto Acknowledgment
channel.basic_consume(
queue='tasks',
on_message_callback=callback,
auto_ack=True # Automatic ACK (risky!)
)
Acknowledgment Növləri
- basic.ack: Mesaj uğurla process edildi
- basic.nack: Mesaj reject edildi (requeue ola bilər)
- basic.reject: Bir mesajı reject et
Message Durability
Durable Queue
# Queue durable olmalıdır
channel.queue_declare(queue='tasks', durable=True)
Persistent Messages
# Mesaj persistent olmalıdır
channel.basic_publish(
exchange='',
routing_key='tasks',
body='Task data',
properties=pika.BasicProperties(
delivery_mode=2 # persistent
)
)
Qeyd: Həm queue, həm də mesaj persistent olmalıdır ki, mesaj itməsin.
Prefetch (QoS)
# Hər consumer maksimum 1 mesaj alsın (ACK alınana qədər)
channel.basic_qos(prefetch_count=1)
# Fair dispatch - işləyən consumer-ə daha çox mesaj
channel.basic_qos(prefetch_count=10)
Fair Dispatch:
Without QoS:
Consumer 1: [msg1, msg3, msg5, msg7, msg9] (ağır mesajlar)
Consumer 2: [msg2, msg4, msg6, msg8, msg10] (yüngül mesajlar)
With QoS (prefetch=1):
Consumer 1: [msg1, msg6, msg7] (balanslanmış)
Consumer 2: [msg2, msg3, msg4, msg5, msg8, msg9, msg10]
Dead Letter Exchange (DLX)
# Main queue with DLX
channel.queue_declare(
queue='tasks',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed',
'x-message-ttl': 60000 # 60 seconds
}
)
# Dead letter queue
channel.queue_declare(queue='failed-tasks', durable=True)
channel.queue_bind(
exchange='dlx',
queue='failed-tasks',
routing_key='failed'
)
Dead Letter halları:
- Mesaj reject edildi (requeue=false)
- TTL expired
- Queue length limit exceeded
TTL (Time To Live)
Message TTL
# Per-message TTL
channel.basic_publish(
exchange='',
routing_key='tasks',
body='Task data',
properties=pika.BasicProperties(
expiration='60000' # 60 seconds
)
)
Queue TTL
# Bütün mesajlar üçün
channel.queue_declare(
queue='tasks',
arguments={'x-message-ttl': 60000}
)
Queue Expiration
# Queue-un özü expire olur (istifadə olunmazsa)
channel.queue_declare(
queue='temp-queue',
arguments={'x-expires': 300000} # 5 minutes
)
Priority Queues
# Priority queue yarat (0-9 arası)
channel.queue_declare(
queue='priority-tasks',
arguments={'x-max-priority': 10}
)
# Yüksək prioritetli mesaj
channel.basic_publish(
exchange='',
routing_key='priority-tasks',
body='Urgent task',
properties=pika.BasicProperties(priority=9)
)
# Aşağı prioritetli mesaj
channel.basic_publish(
exchange='',
routing_key='priority-tasks',
body='Normal task',
properties=pika.BasicProperties(priority=1)
)
Clustering
Cluster Növləri
1. Regular Cluster:
- Metadata paylaşılır
- Queue-lar bir node-da saxlanılır
- High availability üçün queue mirroring lazımdır
2. Federation:
- Müxtəlif datacenter-lər arası
- Loose coupling
- Message replication
3. Shovel:
- Queue-lar arası message transfer
- One-way və ya two-way
Queue Mirroring
# Policy yarat
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
# 2 replica
rabbitmqctl set_policy ha-two "^" '{"ha-mode":"exactly","ha-params":2}'
# Automatic sync
rabbitmqctl set_policy ha-sync "^" \
'{"ha-mode":"all","ha-sync-mode":"automatic"}'
Cluster Setup
# Node 1
rabbitmq-server -detached
# Node 2
rabbitmq-server -detached
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# Node 3
rabbitmq-server -detached
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
Flow Control
Memory-based Flow Control
# rabbitmq.conf
vm_memory_high_watermark.relative = 0.4 # 40% RAM
vm_memory_high_watermark_paging_ratio = 0.75
Disk-based Flow Control
disk_free_limit.absolute = 50GB
disk_free_limit.relative = 2.0 # 2x RAM
Flow Control Mexanizmi:
1. Memory threshold keçildi
2. RabbitMQ connection-ları block edir
3. Publisher mesaj göndərə bilmir
4. Memory azalır
5. Connection-lar unblock olur
Lazy Queues
# Lazy queue - mesajlar diskdə saxlanılır
channel.queue_declare(
queue='large-queue',
arguments={'x-queue-mode': 'lazy'}
)
Normal vs Lazy Queue:
- Normal: Memory-first, sonra disk
- Lazy: Disk-first, yalnız consume zamanı memory
Performance Optimization
Connection Pooling
# Çox connection açmaq əvəzinə pool istifadə et
connection_pool = []
for i in range(10):
connection_pool.append(pika.BlockingConnection())
Channel per Thread
# Hər thread üçün ayrı channel
import threading
thread_local = threading.local()
def get_channel():
if not hasattr(thread_local, 'channel'):
connection = pika.BlockingConnection()
thread_local.channel = connection.channel()
return thread_local.channel
Batch Publishing
# Bir-bir əvəzinə batch publish
messages = [...]
for msg in messages:
channel.basic_publish(
exchange='',
routing_key='queue',
body=msg
)
# Batching transaction ilə daha sürətlidir
Prefetch Tuning
# CPU-bound: aşağı prefetch
channel.basic_qos(prefetch_count=1)
# I/O-bound: yüksək prefetch
channel.basic_qos(prefetch_count=50)
Monitoring
Management Plugin
# Enable management plugin
rabbitmq-plugins enable rabbitmq_management
# Access: http://localhost:15672
# Default: guest/guest
Key Metrics
- Queue depth: Queue-da neçə mesaj var
- Message rate: Publish/consume rate
- Consumer count: Aktiv consumer sayı
- Memory usage: Node memory istifadəsi
- Connection count: Aktiv connection sayı
- Channel count: Aktiv channel sayı
CLI Commands
# Queue status
rabbitmqctl list_queues name messages consumers
# Exchange-lər
rabbitmqctl list_exchanges
# Bindings
rabbitmqctl list_bindings
# Connections
rabbitmqctl list_connections
# Memory usage
rabbitmqctl status
Security
User Management
# User yarat
rabbitmqctl add_user myuser mypassword
# Admin rol ver
rabbitmqctl set_user_tags myuser administrator
# Permissions
rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"
SSL/TLS
# rabbitmq.conf
listeners.ssl.default = 5671
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
Best Practices
- Durable Queues: Production-da həmişə durable queue istifadə edin
- Persistent Messages: Kritik mesajları persistent edin
- Manual ACK: Auto-ack əvəzinə manual ACK istifadə edin
- Prefetch: Əlverişli prefetch dəyəri seçin
- Connection Pooling: Az connection, çox channel
- DLX: Uğursuz mesajlar üçün dead letter queue
- Monitoring: Management plugin və alerting
- TTL: Köhnə mesajlar üçün TTL təyin edin
- Clustering: High availability üçün cluster qurun
- Lazy Queues: Böyük queue-lar üçün lazy mode
Common Patterns
Work Queues (Task Queue)
# Multiple workers processing tasks
for i in range(workers):
channel.basic_consume(queue='tasks', on_message_callback=callback)
Publish/Subscribe (Fanout)
# Broadcast to all consumers
channel.exchange_declare(exchange='logs', exchange_type='fanout')
Routing (Direct)
# Route by severity level
channel.exchange_declare(exchange='logs', exchange_type='direct')
Topics (Topic)
# Complex routing patterns
channel.exchange_declare(exchange='logs', exchange_type='topic')
RPC (Request/Reply)
# Synchronous request-response
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(reply_to=callback_queue),
body=request
)
Troubleshooting
High Memory Usage
- Lazy queues istifadə edin
- Message TTL təyin edin
- Consumer sayını artırın
- Prefetch azaldın
Slow Consumers
- Prefetch artırın
- Consumer sayını artırın
- Processing optimallaşdırın
- Multiple threads istifadə edin
Connection Issues
- Connection timeout artırın
- Heartbeat konfiqurasiyası
- Network stability yoxlayın
- Firewall settings
Message Loss
- Durable queues istifadə edin
- Persistent messages
- Publisher confirms
- Manual ACK
- Clustering + mirroring
Əlavə Resurslar
- Rəsmi Sənədlər: https://www.rabbitmq.com/documentation.html
- Management Plugin: https://www.rabbitmq.com/management.html
- Clustering Guide: https://www.rabbitmq.com/clustering.html
- Best Practices: https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html