NodeJS’te Concurrency ve Parallelism

Mehmet Demirel
10 min readJan 16, 2024

--

Herkese merhaba,

Bu gün Node.js’in temel özelliklerinden biri; sistem kaynaklarının verimli bir şekilde kullanılmasını sağlayan ve uygulama performansını artıran eşzamanlılık(Concurrency) ve paralelliği(Parallelism) işleme yeteneğini ele alacağız.Node.js’te concurrency ve parallelism kavramlarını inceleyecek, bunları elde etmek için çeşitli teknikleri tartışacak ve akılda tutulması gereken önemli hususları vurgulayacağız.

Başlamadan önce bazı konulara hakim olmanız gerekiyor; Event Loop, Sync ve Async .

Concurrency (Eşzamanlılık)

Node.js, tek iş parçacığıyla çalışmasına rağmen birçok görevi aynı anda yönetme kapasitesine sahiptir. Bu, Node.js’in eşzamanlılık (concurrency) özelliğinden kaynaklanmaktadır. Concurrency, bir uygulamanın aynı anda birden fazla görevi veya işlemi yürütme yeteneğidir. Node.js’in concurrency özelliği, bloklamayan G/Ç (Input/Output — Girdi/Çıktı) modeli (non-blocking I/O model) ve asenkron (asynchronous) bir yapı üzerine kurulmuştur. Bu özellikler, Node.js’in eşzamanlı işlemleri etkili bir şekilde yönetmesini sağlar. Concurrency model, yürütme iş parçacığını engellemeden çok sayıda I/O işlemini aynı anda gerçekleştirmesine olanak tanır. Bu, Node.js’in birçok isteği eşzamanlı olarak ele almasını sağlayarak uygulama performansını önemli ölçüde artırır. Concurrency özelliğini sağlayan temel yapı, olay güdümlü (event-driven) bir mimaridir. Event-driven architecture, non-blocking I/O model kullanır. Bu, bir sonrakine geçmeden önce her bir isteğin tamamlanmasını beklemeden birden fazla isteği aynı anda işleyebilme yeteneğini sağlar. Olay döngüsü (event-loop), event-driven architecture’ın temelini oluşturur. Event-loop, sürekli olarak kontrol edilecek yeni olayları yönetir ve bunları uygun işleyicilere gönderir. Bu sayede çok sayıda I/O işlemi aynı anda çalışabilir, bu da çok sayıda isteği hızlı ve verimli bir şekilde işlemenin mümkün olmasını sağlar. Event-driven architecture, özellikle web sunucuları veya gerçek zamanlı veri akışı uygulamaları gibi ağır I/O işlemleri içeren uygulamalar için büyük fayda sağlar. Bu uygulamalarda, çok sayıda isteği eşzamanlı olarak işlemek kritik öneme sahiptir.

Node.js, bu concurrency yeteneğini, single-threaded (tek iş parçacıklı) bir event-loop architecture kullanarak elde eder. Ancak, Node.js single-threaded olmasına rağmen, çok çekirdekli CPU’lardan yararlanmak için her çekirdekte birden fazla Node.js örneği çalıştırabilir. Bu özellik, Node.js’in performansını artırarak uygulama yanıt verme hızını ve ölçeklenebilirliği geliştirir.

Concurrency Patterns

Callbacks

Callbacks, asenkron işlemleri yönetmenin ve concurrency uygulamanın temel mekanizmasıdır. Asenkron programlamada, bir görev tamamlandığında çağrılan ve mevcut görevin tamamlanmasını beklemeden programın diğer görevlere devam etmesini sağlayan fonksiyonlardır. Callbacks, Node.js’in concurrency özelliğinin temelini oluşturur. Callbacks kullanarak, bir dizi asenkron işlemi tek bir iş parçacığında yönetebilir ve eşzamanlılığı sağlayabilirsiniz.

const fs = require('fs'); // fs (file system)

fs.readFile('myFile.txt', (error, data) => {
if (error) {
console.log('File read errror:', error)
return false;
}
console.log(data.toString())
});

Bu kod,fs modülünü kullanarak tamamlandığında callback function çağırır. Callback function, bir hata oluşursa hata mesajını yazdırır. Hata oluşmazsa, dosyanın içeriğini yazdırır.

Promises

Promises, asenkron işlemleri kapsüllemek (encapsulate) ve callback temelli kodu basitleştirmek için kullanılan bir sözdizimidir. Promises, daha temiz ve anlaşılması kolay bir kod yazmanıza olanak tanır.Promises kullanarak, asenkron işlemleri senkron bir şekilde yazabilirsiniz. Bu, kodunuzun daha okunaklı ve yönetilebilir olmasını sağlar.

const fs = require('fs');

const readFilePromise = (filePath) => {
return new Promise((resolve, reject) => {
fs.readFile(filePath, (error, data) => {
if (error) {
reject(error);
} else {
resolve(data);
}
});
});
};

readFilePromise('myFile.txt')
.then((data) => {
console.log(data.toString());
})
.catch((error) => {
console.log('File read error:', error);
});

Bu kod,fs modülünün readFile fonksiyonunu kapsülleyen readFilePromise işlevini kullanır. readFilePromise işlevi, bir dosya yolu alır ve bir Promise döndürür. İşlev, dosya okuma işlemi tamamlandığında resolve çağrısını, bir hata oluştuğunda reject çağrısını gerçekleştirir.

Kod, readFilePromise işlevini kullanarak myFile.txt dosyasını okur. İşlem başarıyla tamamlanırsa, dosyanın içeriği konsola yazdırılır. Aksi takdirde(başarısız olması durumunda), hata mesajı konsola yazdırılır. Bu şekilde, dosya okuma işlemi asenkron olarak gerçekleştirilir ve Promiseler kullanılarak işlemler daha düzenli bir yapıda yönetilir.

Async/Await

Asenkron programlamada (Asynchronous Programming) kullanılan bir desen olarak karşımıza çıkar ve Promises ile birlikte kullanılır. Promises’ın özelliklerini kullanarak, asenkron kodu daha etkili bir şekilde yazma olanağı sağlar. Async/Await, Promises’ı kullanarak asenkron kodunuzu daha anlaşılır ve okunabilir hale getirmenize yardımcı olur. Async/Await, asenkron programlama dünyasında daha etkili bir yazım tarzı sunar.

const pause = (ms) => new Promise((resolve) => setTimeout(resolve, ms))

const taskHandler = async ({
name,
email
}) => {
await pause(1000)
console.log(`Hello ${name}, we sent an e-mail to ${email}.`)
}

const taskDispatcher = async () => {

const userMehmet = {
name: 'Mehmet',
email: 'mehmet@mehmet.com'
}

const userKadir = {
name: 'Kadir',
email: 'kadir@kadir.com'
}

await taskHandler(userMehmet)
await taskHandler(userKadir)
return {
success: true,
message: 'All task completed.'
}
};

const executeTasks = async () => {
const result = await taskDispatcher();
console.log(result)
}

executeTasks()

Bu kod, pause() ve taskHandler() adlı iki fonksiyon içerir. pause() fonksiyonu, zaman gecikmeleri simüle etmek için Promise döndürür. taskHandler() fonksiyonu ise kullanıcı bilgilerini alarak bir mesaj yazdırır.

Ana fonksiyon olan taskDispatcher(), taskHandler() fonksiyonunu farklı kullanıcı bilgileriyle çağırarak görevleri yönetir. executeTasks() fonksiyonu ise taskDispatcher() fonksiyonunu çağırarak işlemleri başlatır ve sonuçları konsola yazdırır.

Non-Blocking I/O

Geleneksel I/O işlemleri, dosya okuma ve ağ istekleri gibi, genellikle iş parçacığını engeller ve eşzamanlılığı zorlaştırır. Node.js, bu sorunu bloklamayan API’leri kullanarak çözer. Non-blocking I/O, bir görevin tamamlanmasını beklemeden diğer görevlere geçebileceğiniz anlamına gelir. Node.js’in non-blocking I/O desteği, eşzamanlılığı büyük ölçüde iyileştirir. Non-blocking I/O kullanarak, çok sayıda I/O işlemini aynı anda gerçekleştirebilir ve uygulamanızın performansını artırabilirsiniz.

const fs = require('node:fs/promises');
const https = require('https');

async function run() {
const fileContent = await fs.readFile('myFile.txt');
console.log(fileContent);

const request = https.request('https://reqres.in/api/users?page=2',
(response) => {
let responseData = '';
response.on('data', (chunk) => {
responseData += chunk;
});
response.on('end', () => {
console.log(JSON.parse(responseData));
});
});
request.end();
}

run();

Bu kod, node:fs/promises ve https modüllerini kullanarak dosya okuma ve HTTP isteği yapma işlemlerini gerçekleştirir.

İlk olarak, fs.readFile fonksiyonu kullanılarak myFile.txt dosyası okunur. Ardından, bu metin dizisi konsola yazdırılır.

Daha sonra, https.request fonksiyonu kullanılarak https://reqres.in/api/users?page=2 adresine bir HTTP GET isteği yapılır. İstek başlatıldıktan sonra, gelen veri parçaları birleştirilerek responseData değişkenine eklenir. İstek tamamlandığında, responseData değişkeni JSON olarak ayrıştırılarak konsola yazdırılır.

Bu şekilde, kod dosya okuma ve HTTP isteği yapma işlemlerini asenkron olarak gerçekleştirir ve sonuçları konsola yazdırır.

Event-Driven Architecture

Event-driven architecture (olay güdümlü mimari), Node.js’in eşzamanlılık özelliğini uygulamak için kullanılan bir başka yaygın desendir. Bu mimari, tek bir iş parçacığına vurgu yapar ve uzun süren görevlerin işletim sistemine devredilmesi üzerine kuruludur. Event-driven architecture kullanarak, bir dizi asenkron olayı tek bir iş parçacığında yönetebilir ve eşzamanlılığı sağlayabilirsiniz.

const { EventEmitter } = require('events');

const myEventEmitter = new EventEmitter();

const taskOne = () => {
console.log('Task one worked!');
};

const taskTwo = () => {
console.log('Task two worked!');
};

myEventEmitter.on('tasks', taskOne);
myEventEmitter.on('tasks', taskTwo);

myEventEmitter.emit('tasks');

Streams

Streams, büyük dosya işlemleri için ideal olan bir veri yapısıdır. Streams, verileri bellekte biriktirmeden sürekli olarak yönetir. Streams kullanarak, büyük dosya işlemleri için daha verimli ve etkili bir şekilde kod yazabilirsiniz.

const fs = require('fs');
const readline = require('readline');

const pause = (ms) => new Promise((resolve) => setTimeout(resolve, ms))

async function readFile() {
const fileStream = fs.createReadStream('myFile.txt', 'utf-8');
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});

for await (const line of rl) {
await pause(100); // tek tek okumayı görmek adına yerleştirildi
console.log(line);
}
}

readFile();

Workers and Thread Pools

Workers and thread pools (işçi ve iş parçacığı havuzları), CPU yoğun görevler için kullanılan bir concurrency pattern’dir. Bu pattern’ler, birden fazla çekirdeği kullanarak performansı artırmaya yardımcı olur. Workers and thread pools kullanarak, CPU yoğun görevlerinizi daha verimli bir şekilde işleyebilirsiniz.

const {
Worker,
isMainThread,
parentPort,
workerData
} = require('worker_threads');

if (isMainThread) {
const sharedData = [1, 2, 3, 4, 5];
const workerPool = new Set();
const results = [];
for (const data of sharedData) {
const worker = new Worker(__filename, {
workerData: {
data
}
});
worker.on('message', (result) => {
const workerId = worker.threadId;
console.log(`Worker ${workerId} result: ${result}`);
results.push({
workerId,
result
});
if (results.length === sharedData.length) {
console.log('All workers have completed.
The main thread continues...');
}
});

workerPool.add(worker);
}

for (const worker of workerPool) {
worker.postMessage('executeTask');
}
} else {
const inputData = workerData.data;
parentPort.on('message', (message) => {
if (message === 'executeTask') {
const result = performIntensiveTask(inputData);
parentPort.postMessage(result);
}
});
}

function performIntensiveTask(data) {
let result = 0;
for (let i = 0; i < data; i++) {
result += i;
}
return result;
}

Node.js ortamında çalışan bu kod, worker_threads modülünü kullanarak çoklu iş parçacığı özelliğini kullanır. Kodun ilk olarak yaptığı şey, worker_threads modülünden ilgili öğeleri almaktır. Bunlar Worker sınıfı, isMainThread özelliği, parentPort nesnesi ve workerData değişkenidir. Ardından, kodun ana iş parçacığı olup olmadığı isMainThread özelliğini kullanarak kontrol edilir. Eğer ana iş parçacığı ise, kod paylaşılan bir dizi oluşturur (sharedData) ve her bir veri için ayrı bir iş parçacığı oluşturur (Worker). Her bir iş parçacığına, işlem yapması için gerekli olan veri (workerData) ile birlikte postMessage yöntemi kullanılarak bir mesaj gönderilir. Her bir iş parçacığından gelen mesajlar dinlenir ve iş parçacığı tarafından tamamlanan işlemin sonucu alınarak sonuçlar dizisine eklenir. Tüm iş parçacıkları tamamlandığında bir mesaj yazdırılır. Ana iş parçacığı değilse, kod executeTask mesajını bekler. Bu mesaj geldiğinde, performIntensiveTask fonksiyonu çağrılarak işlem yapılır ve sonuç parentPort.postMessage yöntemi kullanılarak ana iş parçacığına gönderilir. Bu kod, çoklu iş parçacığı kullanarak paralel işlemleri gerçekleştirmeye yönelik bir örnektir. Ana iş parçacığı, paylaşılan veri kümesini iş parçacıklarına dağıtarak her bir iş parçacığının ayrı bir görevi eşzamanlı olarak gerçekleştirmesini sağlar.

Parallelism (Paralellik)

Parallelism, modern yazılım geliştirmenin temel bir ilkesidir ve performansın artırılması ile eş zamanlı işlemlerin verimli bir şekilde yönetilmesi için birden fazla CPU çekirdeğinin potansiyelinden yararlanmayı sağlar. Bu kavram, özünde, modern çok çekirdekli CPU’ların tüm yeteneklerini kullanarak birden fazla görevin aynı anda yürütülme kabiliyetini ifade eder. Parallelism, tek bir iş parçacığı(single-threaded)nın birden fazla görevi aynı anda yürütmesi anlamına gelmez. Aksine, her bir iş parçacığının kendi görevini gerçekleştirdiği bir yaklaşımı içerir. Varsayılan olarak, Node.js, I/O işlemlerini gerçekleştirmek için tek iş parçacıklı bir olay döngüsünden(event-loop) yararlanır. Bu durum, Node.js’in aynı anda yalnızca bir I/O işlemini işleyebileceği anlamına gelir. Ancak, Node.js’de parallelism sağlamak için birkaç mekanizma kullanılabilir. Bu mekanizmalar, çoklu görevlerin eş zamanlı olarak yürütülmesini mümkün kılar, özellikle Worker Threads ve Cluster modülü gibi araçlar sayesinde performansı artırabilir ve çok çekirdekli sistemlerin avantajlarından tam anlamıyla yararlanabilirsiniz.

Worker Threads & Child Process

Node.js, paralel işlemler için Worker Threads ve Child Process gibi güçlü araçlar sunar. Worker Threads ve Child Process kullanarak hem performans hem de paralelizm avantajları sağlar.

Worker Threads, tek bir Node.js işlemi içinde bağımsız Node.js iş parçacıkları oluşturmanıza ve çalıştırmanıza olanak tanır. Bu iş parçacıkları, birden fazla CPU çekirdeğinden yararlanarak CPU yoğun görevleri paralel olarak yürütebilir. İş parçacığı, Node.j kodunu yürüten bir birimdir. Worker Threads, ana olay döngüsünden ayrı olarak çalışan iş parçacıklarıdır. Bu, CPU yoğun görevlerin ana event-loop(olay göngüsü)’u engellemeden paralel olarak yürütülmesine olanak tanır. Worker Threads, geliştiricilere performans ve yanıt verme hızını artırma imkanı sunar. Örneğin, bir görüntü işleme uygulamasında, görüntü işleme işlemini bir Worker Thread’e taşıyarak ana olay döngüsünden ayırabilirsiniz. Bu, ana olay döngüsünün diğer görevleri yürütmeye odaklanmasına olanak tanıyarak performansı artırır. Her bir Worker Thread, kendi özel bellek alanına sahip olarak izolasyon ve güvenlik sağlar. Bu, Worker Thread’ler arasında veri çakışması riskini azaltır. Ayrıca, Worker Thread’ler arasındaki iletişim, mesajlaşma kullanılarak sağlanabilir.

Child Process, kendi bellek alanı ve olay döngüsüyle bağımsız olarak çalışan ayrı işlemlerdir. Bu, özellikle CPU yoğun görevlerin etkili bir şekilde paralel olarak çalıştırılmasını sağlar. Örneğin, görüntü işleme veya video dönüştürme gibi yoğun iş yükleri, çocuk süreçler kullanılarak performansı artırabilir. Ana olay döngüsünden bağımsız olarak çalışan ayrı işlemlerdir. Bu, CPU yoğun görevlerin ana olay döngüsünü engellemeden paralel olarak yürütülmesine olanak tanır.

// main.js
const { Worker } = require('worker_threads');
const { fork } = require('child_process');
const worker = new Worker('./worker.js');
const child = fork('./child.js');

worker.on('message', (message) => {
console.log('Received from Worker Thread:', message);
worker.terminate();
});

worker.postMessage('Hello from the main thread!');

child.on('message', (message) => {
console.log('Received from Child Process:', message);
child.kill();
});

child.send('Hello from the main thread!');

// child.js
process.on('message', (message) => {
console.log('Received from main thread in Child Process:', message);
process.send('Hello from the Child Process!');
process.exit();
});

// worker.js
const { parentPort } = require('worker_threads');

parentPort.on('message', (message) => {
console.log('Received from main thread in Worker Thread:', message);
parentPort.postMessage('Hello from the Worker Thread!');
parentPort.close();
});

Bu kod, worker.js ve child.js dosyalarını içeri aktarır. Ardından, ana thread’de new Worker() ve child_process.fork() yöntemlerini kullanarak sırasıyla bir Worker Thread ve bir Child Process oluşturur. Ana thread, on('message') olay dinleyicilerini kullanarak Worker Thread ve Child Process'ten gelen mesajları işler. Ana thread, postMessage() ve send() yöntemlerini kullanarak Worker Thread ve Child Process'e mesaj gönderir. İşler tamamlandığında, ana thread terminate(), kill() ve close() yöntemlerini kullanarak Worker Thread ve Child Process'i sonlandırır.

Cluster Module

Cluster module, uygulamaların performansını artırmak adına kritik bir rol oynar. Bu module, yük dengeleyici olarak işlev görerek aynı anda paylaşılan bir bağlantı noktasında çalışan Child Processes’e iş yükünü dengeler. Özellikle, Node.js’in engelleyici (blocking) kodlara etkili bir yanıt vermediği durumlarda oldukça etkilidir. Tek bir işlemci durumunda, ağır ve CPU yoğun bir işlemin meşguliyeti durumunda diğer istekler bu işlemin tamamlanmasını beklemek zorundadır. Ancak, bu durumu aşmak için birden fazla işlem kullanmak mümkündür. Bir işlemci yoğun bir işleme meşgulken, diğer işlemler gelen diğer istekleri alabilir ve diğer kullanılabilir CPU kaynaklarını kullanarak yükü dengeleyebilir. Node.js Cluster module, tek bir makinedeki birden çok CPU çekirdeğinden faydalanmak için kullanılır. Her biri kendi olay döngüsünü çalıştıran ve paylaşılan bir bağlantı noktasında dinleyen birden çok işçi süreci oluşturur. Bu sayede gelen istekler işçiler arasında dağıtılır ve sınırlı CPU kaynaklarıyla performansı ve ölçeklenebilirliği artırır. Geliştiricilere tek bir TCP veya HTTP sunucu portunu paylaşabilen birden fazla alt süreç oluşturmak için basit bir yol sunar. Bu özellik, gelen bağlantıları birden fazla alt sürece dağıtarak ağ odaklı uygulamaları ölçeklendirmeyi ve yükü dengelenmeyi kolaylaştırır. Cluster module, ana sürecin alt süreçleri yönettiği bir parent-child/parent-worker architecture(ana-çocuk/ana-işçi mimarisini) benimser. Gelen bağlantıları kabul eden ana süreç, yerleşik bir round-robin(varsayılan) algoritması kullanarak bunları alt süreçlere dağıtır. Bir alt süreç çöktüğünde veya yanıt veremez hale geldiğinde, Cluster module onu otomatik olarak yeniden başlatarak hata toleransı ve uygulama kullanılabilirliği sağlar. Cluster module, yüksek hacimli eşzamanlı ağ bağlantılarını yönetmek için özellikle kullanışlıdır, bu da onu web sunucularını, WebSocket sunucularını ve gerçek zamanlı sohbet uygulamalarını ölçeklendirmek için değerli bir araç haline getirir.

Cluster Module hakkında daha fazla bilgi için bu makaleme göz atabilirsiniz.

const http = require("node:http");
const { fork, isMaster } = require("node:cluster");
const { availableParallelism } = require("node:os");

const port = 5001;
const numCPUs = availableParallelism();

const startServer = (workerId) => {
const server = http.createServer((req, res) => {
res.setHeader("Worker-Id", workerId);
res.end("Hello World");
});

server.listen(port, () => {
console.log(`Server is running on port ${port} in worker ${workerId}`);
});
};

if (isMaster) {
console.log(`Master ${process.pid} is running`);
for (let i = 0; i < numCPUs; i++) {
const worker = fork();
worker.send({ workerId: worker.id });
}
} else {
process.on("message", (message) => {
if (message && message.workerId) {
startServer(message.workerId);
}
});
}

Bu kod, HTTP sunucusu oluşturmak ve çoklu işlemleri yönetmek için gerekli modüller olan, HTTP sunucusunu oluşturmak için http, çoklu süreçleri yönetmek için cluster ve mevcut CPU sayısını tespit etmek için os modüllerini kullanmaktadır. Mevcut CPU sayısı avilableParalellismfonksiyonu ile tespit numCPUsdeğişkenine aktarılır. startServerfonksiyonu, her bir işlemcinin kendi HTTP sunucusunu başlatmasını sağlayarak gelen isteklere “Hello World” yanıtını verirken, yanıt başlığına işçi sürecinin ID’sini eklemektedir. Ana süreç, mevcut CPU sayısı kadar işçi süreç başlatır ve her birine bir ID gönderir. İşçi süreçleri ise, ana süreçten gelen ID’yi aldıktan sonra startServerfonksiyonunu kullanarak kendi HTTP sunucularını başlatır ve port 5001'de dinlemeye başlarlar. Bu yapı, Node.js’in dahili yük dengeleyicisi sayesinde gelen istekleri işçi süreçleri arasında dağıtarak her CPU çekirdeğini etkin bir şekilde kullanır, böylece uygulamanın performansını artırır.

Concurrency & Paralellism

Buraya kadar okuduğunuz için teşekkür ederim. Olabildiğince bir çok konuya değinmek istedim ve umarım sizin ilginizi çekmiştir. Bir sonraki yazımda görüşmek dileğiyle :).

--

--