Nedir Bu Message Queue?
Herkese merhaba,
Bugün, yazılım sistemlerinde bileşenler arasında mesaj alışverişini kolaylaştıran ve sistemlerin performansını artıran bir mekanizma olan Message Queue (Mesaj Kuyruğu) konusunu ele alacağız. Message Queue’nin ne olduğunu, temel özelliklerini, hangi alanlarda kullanıldığını ve popüler Message Queue sistemlerini inceleyeceğiz. Ayrıca, RabbitMQ, Apache Kafka, Amazon SQS ve Redis kullanarak basit birkaç mesaj kuyruğu nasıl oluşturulur, örnek kodlarla göstereceğiz. Hazırsanız, başlayalım!
Message Queue Nedir?
Message Queue (Mesaj Kuyruğu), yazılım sistemlerinde farklı bileşenler arasında mesajların güvenilir ve asenkron bir şekilde iletilmesini sağlayan bir yazılım mimarisi bileşenidir. Bu yapı, mesajların belirli bir sırayla gönderilip alınmasını ve işlem görmesini sağlar. Mesaj kuyruğu sistemleri, bileşenlerin birbirinden bağımsız çalışabilmesini ve sistem performansının artmasını sağlar.
Temel Kavramlar
- Gönderen (Producer): Mesajı oluşturan ve mesaj kuyruğuna yerleştiren bileşendir. Örneğin, bir kullanıcı sipariş verdiğinde bu bilgiyi işlemek üzere kuyruğa bir mesaj ekler.
- Alıcı (Consumer): Mesajı kuyruğun uygun bir zamanında alıp işleyen bileşendir. Örneğin, sipariş bilgisi kuyruktan alınıp envanter güncelleme işlemi gerçekleştirilir.
- Kuyruk (Queue): Mesajların geçici olarak depolandığı ve sıralandığı yerdir. Mesajlar, kuyruğa giriş sırasına göre alınır ve işlenir (FIFO — First In, First Out).
Çalışma Şekli
- Mesaj Gönderimi: Gönderen bileşen, bir mesaj oluşturur ve bu mesajı kuyruğa ekler.
- Mesaj Depolama: Mesaj, kuyrukta bekler. Kuyruk, mesajları geçici olarak saklar ve onları alıcılara uygun bir zaman geldiğinde iletir.
- Mesaj Alma: Alıcı bileşen, kuyruğu dinler ve uygun olduğunda mesajı alıp işler.
Neden Asenkron İletişim?
Asenkron iletişim, bileşenlerin birbirinden bağımsız çalışmasını sağlar. Bu, özellikle yüksek performans ve esneklik gerektiren sistemlerde kritiktir. Örneğin:
- Bağımsız İşlem: Gönderen bileşen, mesajı kuyruğa ekledikten sonra başka işlemler yapmaya devam edebilir. Alıcı bileşen ise uygun olduğunda mesajı alır ve işler.
- Yüksek Performans: Bileşenler, aynı anda birden fazla işlem yapabilir. Gönderici ve alıcı, birbirini beklemek zorunda kalmaz.
- Hata Toleransı: Bir bileşen geçici olarak kullanılamazsa, mesaj kuyrukta bekler ve bileşen tekrar kullanılabilir hale geldiğinde işleme alınır.
Message Queue Sistemlerinin Temel Özellikleri
- Dayanıklılık: Mesajlar, belirli bir süre veya belirli koşullar sağlanana kadar kuyrukta bekletilir. Bu, mesajların kaybolmamasını ve sistemin güvenilir olmasını sağlar.
- Ölçeklenebilirlik: Sistem bileşenleri arasındaki iletişim yükü artarsa, yeni alıcılar eklenerek sistem yatayda genişletilebilir.
- Çeşitli İletişim Modelleri: Point-to-Point (Noktadan Noktaya) ve Publish-Subscribe (Yayınla-Abone Ol) gibi farklı iletişim modellerini destekler.
- Önceliklendirme: Bazı mesajlar diğerlerinden daha önemli olabilir ve öncelikli olarak işlenmesi gerekebilir. Mesaj kuyruğu bu tür öncelikleri yönetebilir.
Message Queue’nin Kullanım Alanları
- Dağıtık Sistemler: Farklı sunucular arasında mesaj alışverişi yapılmasını sağlar.
- Mikroservis Mimarisi: Mikroservisler arasında bağımsız ve güvenilir iletişim kurmak için kullanılır.
- İşlem Sıralama: Büyük veri işleme ve sıraya koyma gereksinimi olan uygulamalarda kullanılır.
- E-posta Kuyruklama: E-posta gönderme işlemlerini sıraya koyarak sistem performansını artırır.
- Arka Plan Görevleri: Zaman alan işlemler (örneğin, dosya işleme, veri analizi) arka planda çalıştırılarak kullanıcı deneyimi iyileştirilir.
Popüler Message Queue Sistemleri
- RabbitMQ: Yaygın olarak kullanılan, açık kaynaklı bir mesaj kuyruğu sistemidir. AMQP protokolünü kullanır.
- Apache Kafka: Yüksek hacimli veri akışlarını işlemek için tasarlanmış bir dağıtık akış platformudur.
- Amazon SQS (Simple Queue Service): Amazon Web Services (AWS) tarafından sağlanan yönetilen bir mesaj kuyruğu hizmetidir.
- Redis: Yüksek performanslı bir anahtar-değer veritabanı olmasının yanı sıra, mesaj kuyruğu işlevselliği de sunar.
Bileşenler Nedir?
Bileşenler, yazılım sistemlerinin belirli bir işlevini gerçekleştiren bağımsız modüllerdir. Bu modüller, sistemin geneline katkıda bulunan özel görevleri yerine getirir ve büyük yazılım sistemlerinin daha yönetilebilir ve esnek olmasını sağlar.
- Mikroservisler (Microservices): Büyük bir uygulamanın bağımsız olarak geliştirilebilen, dağıtılabilen ve ölçeklenebilen küçük parçalarıdır. Örneğin, bir e-ticaret sitesinde kullanıcı yönetimi, ürün kataloğu ve sipariş yönetimi mikroservisleri olarak ayrılabilir.
- Veritabanları (Databases): Verileri saklayan ve bu verilere erişim sağlayan bileşenlerdir. Örneğin, PostgreSQL, MongoDB gibi veritabanları, verilerin saklanması ve yönetilmesi için kullanılır.
- Ön Uç ve Arka Uç (Frontend and Backend): Ön uç, kullanıcı arayüzünü oluşturan bileşenlerdir; arka uç ise iş mantığını işleyen bileşenlerdir. Örneğin, React veya Angular ile yazılmış ön uç bileşenleri ve Node.js veya Go ile yazılmış arka uç bileşenleri, kullanıcı arayüzü ve iş mantığını yönetir.
- API Gateway (API Ağ Geçidi): Farklı mikroservislerin birleşik bir API üzerinden erişilmesini sağlayan bileşendir. Örneğin, NGINX veya API Gateway gibi araçlar, mikroservislerin yönetimini ve erişimini kolaylaştırır.
- İşlemciler ve Kuyruk İşleyiciler (Processors and Queue Handlers): Kuyrukta bekleyen görevleri işleyen bileşenlerdir. Örneğin, arka planda e-posta gönderen, raporlar oluşturan veya veri işleyen işlemciler, görevlerin verimli bir şekilde yerine getirilmesini sağlar.
Bileşenler Arası Mesaj Alışverişi
Bileşenler arasındaki mesaj alışverişi, genellikle asenkron bir şekilde gerçekleştirilir ve Message Queue sistemleri bu süreci kolaylaştırır.
- Mikroservisler Arası İletişim: Bir sipariş yönetimi mikroservisi, bir ödeme mikroservisine ödeme işleminin tamamlandığını bildirmek için mesaj kuyruğunu kullanabilir.
- Ön Uç ve Arka Uç İletişimi: Ön uç, kullanıcı giriş bilgilerini arka uca gönderir ve arka uç, doğrulama işlemini gerçekleştirdikten sonra sonucu ön uca iletir.
- Veritabanı Güncellemeleri: Bir mikroservis, veritabanında bir güncelleme yaptığında diğer ilgili mikroservislere bu değişikliği bildirmek için mesaj kuyruğunu kullanabilir.
- İşlem Kuyrukları: Büyük veri işlemleri veya arka plan görevleri, bir işlemci bileşenine iletilir ve bu bileşen görevleri sırayla işler.
Buraya kadar bazı şeylere hakim olduk gelin bir senaryo hazırlayalım ve bir kaç örnek yapıp konuyu bitirelim.
Senaryo
Bir restoran düşünün. Garson, siparişinizi not alıp mutfağa iletiyor. O sırada siz de diğer müşterileri izlemeye başlıyorsunuz. Fark ediyorsunuz ki, her sipariş sırasıyla alınarak mutfağa iletiliyor ve sırasıyla hazırlanıyor.
Garson, tüm siparişleri bir seferde alıyor. İlk müşteri John Doe, Pizza Margherita siparişini veriyor. Ardından, Jane Smith Spaghetti Carbonara siparişini veriyor. Mike Johnson Caesar Salad ve son olarak Anna Brown Tiramisu siparişini veriyor. Tüm bu siparişler garson tarafından bir adisyonda toplanıyor.
Tüm siparişler toplandıktan sonra, garson bu siparişleri mutfağa iletiyor. Mutfağa iletilen siparişler, sırayla hazırlanıp müşterilere teslim ediliyor. Önce John Doe’nun pizza siparişi hazırlanıp teslim ediliyor. Ardından Jane Smith’in spagetti siparişi hazırlanıyor ve teslim ediliyor. Bu şekilde sırayla tüm siparişler tamamlanıyor.
RabbitMQ
RabbitMQ, açık kaynaklı bir mesaj aracısı yazılımıdır. Advanced Message Queuing Protocol (AMQP) standardını kullanarak çalışır ve mesajların farklı uygulamalar veya bileşenler arasında güvenilir bir şekilde iletilmesini sağlar. Yüksek performans, esneklik ve kolay kurulum gibi özellikleri sayesinde yaygın olarak kullanılır.
RabbitMQ’nun Temel Özellikleri
- Dayanıklılık: Mesajlar disk üzerinde saklanabilir, böylece sistem çökse bile mesajlar kaybolmaz.
- Esneklik: Farklı platformlar ve diller arasında iletişim sağlar. AMQP, MQTT, STOMP gibi protokolleri destekler.
- Ölçeklenebilirlik: Yatay ve dikey ölçeklenebilir. Yani, sistem yükü arttıkça daha fazla sunucu ekleyerek performansı artırabilirsiniz.
- Yönetim Arayüzü: RabbitMQ, web tabanlı bir yönetim arayüzü sunar. Bu arayüz üzerinden kuyruğu yönetebilir, mesaj trafiğini izleyebilir ve sistem performansını gözlemleyebilirsiniz.
- Geniş Eklenti Desteği: RabbitMQ, çeşitli eklentilerle işlevselliğini artırabilir. Örneğin, federasyon, kümeleme ve plugin desteği gibi.
RabbitMQ’nun Çalışma Şekli
- Üretici (Producer): Mesajları oluşturur ve RabbitMQ’ya gönderir.
- Değiştirici (Exchange): Gelen mesajları alır ve uygun kuyruklara yönlendirir. Değiştirici türleri arasında direct, topic, fanout ve headers bulunur.
- Kuyruk (Queue): Mesajların geçici olarak depolandığı yerdir.
- Tüketici (Consumer): Mesajları kuyruktan alır ve işler.
Kodun tamamı: https://github.com/ExorTek/nodejs-message-queue-examples
// Gerekli modülleri içe aktar
const amqp = require('amqplib'); // amqplib kütüphanesini dahil ediyoruz
const orders = require('../orders'); // 'orders' modülünü dahil ediyoruz
const { pause } = require('../helpers'); // 'pause' fonksiyonunu 'helpers' modülünden dahil ediyoruz
// RabbitMQ yapılandırması
const queueName = 'orderQueue'; // Kullanılacak kuyruk adını belirliyoruz
const amqpUrl = 'amqp://localhost'; // RabbitMQ bağlantı adresini belirliyoruz
// Global kanal değişkeni
let channel; // Kanalı global değişken olarak tanımlıyoruz
// RabbitMQ sunucusuna bağlan ve kanalı ayarla
amqp
.connect(amqpUrl)
.then(async (connection) => {
// Bir kanal oluştur
channel = await connection.createChannel(); // Bağlantı üzerinden bir kanal oluşturuyoruz
// Kuyruğu oluştur veya mevcutsa kullan
await channel.assertQueue(queueName, { durable: true }); // Kuyruğu kalıcı olacak şekilde oluşturuyoruz veya mevcutsa kullanıyoruz
console.info('RabbitMQ connected successfully!'); // Bağlantı başarıyla sağlandığında bilgi mesajı
// Siparişleri oluştur ve işle
await createOrders(); // Siparişleri oluşturma fonksiyonunu çağırıyoruz
await processOrders(); // Siparişleri işleme fonksiyonunu çağırıyoruz
// İşlemler bitince kanalı ve bağlantıyı kapat
await channel.close(); // Kanalı kapatıyoruz
await connection.close(); // Bağlantıyı kapatıyoruz
console.info('RabbitMQ connection closed, all tasks completed.'); // Tüm işlemler tamamlandığında bilgi mesajı
process.exit(0); // İşlemi sonlandırıyoruz
})
.catch((err) => {
console.error('RabbitMQ connection error', err); // Hata durumunda hata mesajı
if (channel) channel.close(); // Hata durumunda kanal açıksa kapatıyoruz
process.exit(1); // Hata durumunda işlemi sonlandırıyoruz
});
// Siparişleri oluştur ve kuyruğa gönder
const createOrders = async () => {
for (const order of orders) { // Her bir sipariş için döngü
console.log(` [x] ${order.product} order received from ${order.customer}`); // Sipariş alındığında bilgi mesajı
await channel.sendToQueue(queueName, Buffer.from(JSON.stringify(order)), { persistent: true }); // Siparişi JSON formatında kuyruğa gönderiyoruz
await pause(1000); // İşlem süresini simüle etmek için duraklatma
}
};
// Kuyruktan siparişleri işle
const processOrders = async () => {
while (true) { // Sonsuz döngü ile sürekli sipariş kontrolü
const msg = await channel.get(queueName, { noAck: false }); // Kuyruktan mesaj alıyoruz
if (msg) { // Mesaj varsa
const parsedOrder = JSON.parse(msg.content.toString()); // Mesajı JSON formatında çözümleme
console.log(` [x] The order was completed and delivered to ${parsedOrder.customer}`); // Sipariş tamamlandığında bilgi mesajı
channel.ack(msg); // Mesajı onaylıyoruz
await pause(3000); // İşlem süresini simüle etmek için duraklatma
} else { // Mesaj yoksa
console.log('No orders left'); // Kuyrukta sipariş kalmadığında bilgi mesajı
break; // Döngüyü sonlandır
}
}
};
Apache Kafka
Apache Kafka, açık kaynaklı bir dağıtık akış platformudur. Başlangıçta LinkedIn tarafından geliştirilen ve ardından Apache Software Foundation tarafından açık kaynak olarak sunulan Kafka, yüksek hacimli veri akışlarını işlemek ve gerçek zamanlı veri akışlarını yönetmek için tasarlanmıştır. Özellikle büyük veri ve dağıtık sistemler için ideal bir çözüm sunar.
Apache Kafka’nın Temel Özellikleri
- Yüksek Performans: Kafka, saniyede milyonlarca mesajı düşük gecikmeyle işleyebilir.
- Dayanıklılık: Mesajlar disk üzerinde kalıcı olarak saklanır, bu da veri kaybını önler ve güvenilirliği artırır.
- Dağıtık Mimari: Kafka, kümeleme (clustering) ve bölme (partitioning) ile yüksek düzeyde ölçeklenebilir. Birden fazla sunucu üzerinde çalışarak yük dengelemesi sağlar.
- Esneklik: Kafka, hem yayınlama-abone olma (publish-subscribe) modelini hem de mesaj kuyruğu (message queue) modelini destekler.
- Gerçek Zamanlı İşleme: Gerçek zamanlı veri akışlarını işleyebilir, bu da anlık analiz ve veri işleme uygulamaları için idealdir.
- Eklenti ve Entegrasyon Desteği: Kafka, çeşitli veri kaynakları ve hedefleri ile entegrasyon için geniş bir eklenti ekosistemine sahiptir.
Apache Kafka’nın Çalışma Şekli
- Üretici (Producer): Mesajları oluşturur ve Kafka’ya gönderir. Mesajlar belirli bir konuya (topic) gönderilir.
- Konu (Topic): Mesajların kategorize edildiği yerdir. Her konu, bir veya daha fazla bölüme (partition) ayrılabilir.
- Bölüm (Partition): Her konu, daha küçük bölümlere ayrılarak paralel işleme olanak tanır. Bu, yük dengelemesi ve yüksek performans sağlar.
- Tüketici (Consumer): Mesajları belirli bir konudan alır ve işler. Tüketiciler, tüketici grupları (consumer groups) halinde çalışabilir, bu da mesajların paralel ve ölçeklenebilir bir şekilde işlenmesini sağlar.
- Broker: Kafka kümesindeki her sunucuya verilen addır. Broker’lar, mesajları depolar ve yönlendirir.
- Zookeeper: Kafka kümesinin koordinasyonunu ve yapılandırma yönetimini sağlar.
Apache Kafka, özellikle yüksek hacimli veri akışlarını yönetmek ve analiz etmek isteyen büyük ölçekli uygulamalar için ideal bir çözümdür. Gerçek zamanlı veri işleme ve analiz uygulamaları için mükemmel bir performans sunar.
Kodun tamamı: https://github.com/ExorTek/nodejs-message-queue-examples
const { Kafka } = require('kafkajs'); // KafkaJS kütüphanesini dahil ediyoruz
const orders = require('../orders'); // 'orders' modülünü dahil ediyoruz
const { pause } = require('../helpers'); // 'pause' fonksiyonunu 'helpers' modülünden dahil ediyoruz
const kafka = new Kafka({
clientId: 'order-app', // Kafka istemcisi için kimlik belirliyoruz
brokers: ['localhost:9092'], // Kafka broker adreslerini belirliyoruz
});
const producer = kafka.producer(); // Bir Kafka üreticisi oluşturuyoruz
const consumer = kafka.consumer({ groupId: 'order-group' }); // Bir Kafka tüketicisi oluşturuyoruz ve bir grup kimliği belirliyoruz
const topic = 'orderTopic'; // Kullanılacak konuyu belirliyoruz
(async () => {
await producer.connect(); // Üreticiyi Kafka'ya bağlıyoruz
console.info('Producer connected successfully!'); // Üretici başarıyla bağlandığında bilgi mesajı
await consumer.connect(); // Tüketiciyi Kafka'ya bağlıyoruz
console.info('Consumer connected successfully!'); // Tüketici başarıyla bağlandığında bilgi mesajı
await consumer.subscribe({ topic, fromBeginning: true }); // Tüketiciyi belirli bir konuya abone yapıyoruz
console.info(`Subscribed to ${topic}`); // Abonelik başarıyla tamamlandığında bilgi mesajı
await createOrders(); // Siparişleri oluşturma fonksiyonunu çağırıyoruz
await processOrders(); // Siparişleri işleme fonksiyonunu çağırıyoruz
})();
const createOrders = async () => {
for (const order of orders) { // Her bir sipariş için döngü
console.log(` [x] ${order.product} order received from ${order.customer}`); // Sipariş alındığında bilgi mesajı
await producer.send({
topic: topic, // Mesajın gönderileceği konuyu belirliyoruz
messages: [{ value: JSON.stringify(order) }], // Siparişi JSON formatında mesaj olarak gönderiyoruz
});
await pause(1000); // İşlem süresini simüle etmek için duraklatma
}
};
const processOrders = async () => {
await consumer
.run({
eachMessage: async ({ topic, partition, message }) => { // Her bir mesaj için işleme fonksiyonu
const parsedOrder = JSON.parse(message.value.toString()); // Mesajı JSON formatında çözümleme
console.log(` [x] The order was completed and delivered to ${parsedOrder.customer}`); // Sipariş tamamlandığında bilgi mesajı
await pause(3000); // İşlem süresini simüle etmek için duraklatma
},
})
.then(async () => {
await producer.disconnect(); // Üretici bağlantısını kes
await consumer.disconnect(); // Tüketici bağlantısını kes
console.info('Kafka connection closed, all tasks completed.'); // Tüm işlemler tamamlandığında bilgi mesajı
process.exit(0); // İşlemi sonlandır
})
.catch((err) => {
console.error('Error processing orders', err); // Hata mesajı
process.exit(1); // Hata durumunda işlemi sonlandır
});
};
Amazon SQS
Amazon SQS (Simple Queue Service), AWS tarafından sağlanan yönetilen bir mesaj kuyruğu hizmetidir. Farklı bileşenler arasında mesajların güvenilir ve asenkron bir şekilde iletilmesini sağlar. Tamamen yönetilen bir hizmet olduğu için altyapı yönetimi gerektirmez ve kolay ölçeklenebilirlik sunar.
Amazon SQS’nin Temel Özellikleri
- Dayanıklılık: Mesajlar yüksek dayanıklılıkla saklanır, böylece veri kaybı riski minimize edilir.
- Esneklik: Çeşitli mesaj türlerini ve boyutlarını destekler. FIFO (First-In-First-Out) ve Standart kuyruk tiplerini sunar.
- Ölçeklenebilirlik: Otomatik olarak ölçeklenir. Trafik artışlarına otomatik olarak uyum sağlar ve yüksek hacimli mesajları işleyebilir.
- Güvenlik: Mesajların güvenliğini sağlamak için AWS IAM (Identity and Access Management) ile entegrasyon ve şifreleme seçenekleri sunar.
- Kolay Entegrasyon: Diğer AWS hizmetleri ve üçüncü taraf uygulamalarla kolayca entegre edilebilir.
Amazon SQS’nin Çalışma Şekli
- Üretici (Producer): Mesajları oluşturur ve SQS kuyruğuna gönderir.
- Kuyruk (Queue): Mesajların geçici olarak depolandığı yerdir. Mesajlar sırayla alınır ve işlenir.
- Tüketici (Consumer): Mesajları kuyruğa gelen sırasıyla alır ve işler. FIFO kuyruklarda mesajlar kesinlikle gönderildikleri sırayla işlenir.
Kodun tamamı: https://github.com/ExorTek/nodejs-message-queue-examples
const { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } = require('@aws-sdk/client-sqs'); // AWS SQS kütüphanesinden gerekli sınıfları içe aktar
const orders = require('../orders'); // 'orders' modülünü içe aktar
const { pause } = require('../helpers'); // 'pause' fonksiyonunu 'helpers' modülünden içe aktar
const region = 'YOUR_REGION'; // AWS bölgesini belirle
const config = {
region: region, // Bölgeyi yapılandırma içine ekle
credentials: {
accessKeyId: 'YOUR_ACCESS_KEY_ID', // AWS erişim anahtar kimliği
secretAccessKey: 'YOUR_SECRET_ACCESS_KEY', // AWS gizli erişim anahtarı
},
messageGroupId: 'ORDER_GROUP_ID', // Mesaj grup kimliği
queueUrl: `https://sqs.${region}.amazonaws.com/ACCOUNT_ID/ORDER_QUEUE_NAME`, // Kuyruk URL'si
};
const sqsClient = new SQSClient({
region: config.region, // SQS istemcisini bölgeyle yapılandır
credentials: config.credentials, // SQS istemcisini kimlik bilgileriyle yapılandır
});
const createOrders = async () => {
for (const order of orders) { // Her bir sipariş için döngü
console.log(` [x] ${order.product} order received from ${order.customer}`); // Sipariş alındığında bilgi mesajı
const params = {
QueueUrl: config.queueUrl, // Mesajın gönderileceği kuyruk URL'si
MessageBody: JSON.stringify(order), // Mesaj gövdesi olarak siparişi JSON formatında gönder
DelaySeconds: 0, // Mesaj gönderiminde gecikme yok
MessageDeduplicationId: order.id.toString(), // Mesajın tekrarlanmasını önlemek için kimlik
MessageGroupId: config.messageGroupId, // Mesaj grup kimliği
};
await sqsClient.send(new SendMessageCommand(params)); // Mesajı SQS kuyruğuna gönder
await pause(1000); // İşlem süresini simüle etmek için duraklatma
}
};
const processOrders = async () => {
while (true) { // Sonsuz döngü ile sürekli sipariş kontrolü
const params = {
QueueUrl: config.queueUrl, // Mesajların alınacağı kuyruk URL'si
MaxNumberOfMessages: 1, // Aynı anda en fazla bir mesaj al
WaitTimeSeconds: 10, // Mesaj bekleme süresi
};
const data = await sqsClient.send(new ReceiveMessageCommand(params)); // SQS kuyruğundan mesaj al
if (data.Messages && data.Messages.length > 0) { // Mesaj varsa
const message = data.Messages[0]; // İlk mesajı al
const parsedOrder = JSON.parse(message.Body); // Mesaj gövdesini JSON formatında çözümle
console.log(` [x] The order was completed and delivered to ${parsedOrder.customer}`); // Sipariş tamamlandığında bilgi mesajı
const deleteParams = {
QueueUrl: config.queueUrl, // Silinecek mesajın kuyruk URL'si
ReceiptHandle: message.ReceiptHandle, // Mesajın ReceiptHandle değeri
};
await sqsClient.send(new DeleteMessageCommand(deleteParams)); // Mesajı kuyruktan sil
await pause(3000); // İşlem süresini simüle etmek için duraklatma
} else { // Mesaj yoksa
console.log('No orders left'); // Kuyrukta sipariş kalmadığında bilgi mesajı
break; // Döngüyü sonlandır
}
}
};
(async () => {
console.info('AWS SQS connected successfully!'); // AWS SQS bağlantısı başarıyla sağlandığında bilgi mesajı
await createOrders(); // Siparişleri oluşturma fonksiyonunu çağır
await processOrders(); // Siparişleri işleme fonksiyonunu çağır
console.info('AWS SQS tasks completed.'); // Tüm görevler tamamlandığında bilgi mesajı
process.exit(0); // İşlemi sonlandır
})();
Redis
Redis, açık kaynaklı ve yüksek performanslı bir bellek içi veri yapısı deposudur. Genellikle önbellekleme, oturum yönetimi ve hızlı veri erişimi gerektiren uygulamalarda kullanılır. Redis, çeşitli veri yapıları (string, hash, list, set, sorted set) ile çalışabilme yeteneği sayesinde esnek ve güçlü bir çözüm sunar.
Redis’in Temel Özellikleri
- Yüksek Performans: Bellek içi depolama sayesinde çok düşük gecikmeyle veri okuma ve yazma işlemleri yapabilir.
- Dayanıklılık: Veri kaybını önlemek için RDB (Redis Database) ve AOF (Append-Only File) gibi kalıcı depolama seçenekleri sunar.
- Esneklik: Çeşitli veri yapıları (string, hash, list, set, sorted set) destekler, bu da farklı veri gereksinimlerine uyum sağlar.
- Dağıtık Mimari: Redis, master-slave replikasyonu ve Redis Cluster ile ölçeklenebilirlik sağlar.
- Atomic Operations: Redis, tüm işlemlerin atomik olarak gerçekleşmesini sağlar, bu da veri tutarlılığını artırır.
- Geniş Eklenti Desteği: Redis, çeşitli modüller ve eklentiler ile işlevselliğini artırabilir.
Redis’in Çalışma Şekli
- Üretici (Producer): Mesajları oluşturur ve Redis kuyruklarına (list) gönderir.
- Kuyruk (Queue): Mesajların geçici olarak depolandığı yerdir. Redis’te bu, list veri yapısı kullanılarak gerçekleştirilir.
- Tüketici (Consumer): Mesajları kuyruktan alır ve işler. Redis’te
LPOP
komutu kullanılarak kuyruktaki mesajlar alınır.
Kodun tamamı: https://github.com/ExorTek/nodejs-message-queue-examples
const redis = require('redis'); // redis kütüphanesini dahil ediyoruz
const orders = require('../orders'); // 'orders' modülünü dahil ediyoruz
const { pause } = require('../helpers'); // 'pause' fonksiyonunu 'helpers' modülünden dahil ediyoruz
const queueName = 'orderQueue'; // Kullanılacak kuyruk adını belirliyoruz
const redisClient = redis.createClient(); // Bir Redis istemcisi oluşturuyoruz
redisClient.on('error', (err) => { // Redis istemcisinde hata oluşursa hata mesajı
console.error('Redis Client Error', err);
});
(async () => {
await redisClient.connect(); // Redis istemcisini sunucuya bağlıyoruz
console.info('Redis connected successfully!'); // Bağlantı başarıyla sağlandığında bilgi mesajı
await createOrders(); // Siparişleri oluşturma fonksiyonunu çağırıyoruz
await processOrders(); // Siparişleri işleme fonksiyonunu çağırıyoruz
await redisClient.quit(); // Redis istemcisini kapatıyoruz
process.exit(0); // İşlemi sonlandırıyoruz
})().catch((err) => { // Hata durumunda
console.error('Redis connection error', err); // Hata mesajı
redisClient.quit(); // Redis istemcisini kapatıyoruz
process.exit(1); // İşlemi sonlandırıyoruz
});
// Siparişleri oluştur ve kuyruğa gönder
const createOrders = async () => {
for (const order of orders) { // Her bir sipariş için döngü
console.log(` [x] ${order.product} order received from ${order.customer}`); // Sipariş alındığında bilgi mesajı
await redisClient.rPush(queueName, JSON.stringify(order)); // Siparişi JSON formatında kuyruğa ekliyoruz
await pause(1000); // İşlem süresini simüle etmek için duraklatma
}
};
// Kuyruktan siparişleri işle
const processOrders = async () => {
while (true) { // Sonsuz döngü ile sürekli sipariş kontrolü
const order = await redisClient.lPop(queueName); // Kuyruktan siparişi alıyoruz
if (order) { // Sipariş varsa
const parsedOrder = JSON.parse(order); // Siparişi JSON formatında çözümle
console.log(` [x] The order was completed and delivered to ${parsedOrder.customer}`); // Sipariş tamamlandığında bilgi mesajı
await pause(3000); // İşlem süresini simüle etmek için duraklatma
} else { // Sipariş yoksa
console.log('No orders left'); // Kuyrukta sipariş kalmadığında bilgi mesajı
break; // Döngüyü sonlandır
}
}
};
Buraya kadar okuduğunuz için teşekkür ederim. Bir sonraki yazımda görüşmek dileğiyle :).
Kod: https://github.com/ExorTek/nodejs-message-queue-examples