Concurrency Models and Parallel Processing
About 1598 wordsAbout 20 min
2025-08-05
This section explores JavaScript's concurrency models, parallel processing capabilities, and advanced asynchronous programming patterns.
Concurrency Fundamentals
JavaScript is single-threaded but provides powerful concurrency mechanisms through the event loop and asynchronous programming.
The Event Loop Architecture
The event loop is the core of JavaScript's concurrency model:
// Event loop visualization
function eventLoopExample() {
console.log('Start');
setTimeout(() => {
console.log('Timeout 1');
}, 0);
Promise.resolve().then(() => {
console.log('Promise 1');
});
setTimeout(() => {
console.log('Timeout 2');
}, 0);
Promise.resolve().then(() => {
console.log('Promise 2');
});
console.log('End');
}
// Execution order:
// 1. Start
// 2. End
// 3. Promise 1 (microtask)
// 4. Promise 2 (microtask)
// 5. Timeout 1 (macrotask)
// 6. Timeout 2 (macrotask)
Task Queues
JavaScript has multiple task queues with different priorities:
- Microtask Queue: Higher priority, includes Promise callbacks
- Macrotask Queue: Lower priority, includes setTimeout, setInterval, I/O callbacks
// Microtask vs Macrotask demonstration
function queueDemo() {
console.log('Script start');
// Macrotasks
setTimeout(() => console.log('setTimeout'), 0);
setImmediate(() => console.log('setImmediate'));
// Microtasks
Promise.resolve().then(() => console.log('Promise 1'));
Promise.resolve().then(() => {
console.log('Promise 2');
Promise.resolve().then(() => console.log('Promise 3'));
});
// Queue microtask during microtask execution
queueMicrotask(() => console.log('queueMicrotask'));
console.log('Script end');
}
// Output order:
// Script start
// Script end
// Promise 1
// Promise 2
// Promise 3
// queueMicrotask
// setTimeout
// setImmediate
Advanced Asynchronous Patterns
Async Generators
// Async generators for streaming data
async function* asyncGenerator() {
let i = 0;
while (i < 5) {
await new Promise(resolve => setTimeout(resolve, 1000));
yield i++;
}
}
// Consuming async generators
async function consumeAsyncGenerator() {
const gen = asyncGenerator();
for await (const value of gen) {
console.log('Received:', value);
}
}
// Async generator with error handling
async function* safeAsyncGenerator() {
try {
for (let i = 0; i < 5; i++) {
if (i === 3) {
throw new Error('Something went wrong');
}
await new Promise(resolve => setTimeout(resolve, 500));
yield i;
}
} catch (error) {
console.error('Generator error:', error);
yield 'error-handled';
}
}
Async Iterators and Transform Streams
// Custom async iterator
class AsyncDataSource {
constructor(data) {
this.data = data;
this.index = 0;
}
async next() {
if (this.index >= this.data.length) {
return { done: true, value: undefined };
}
// Simulate async operation
await new Promise(resolve => setTimeout(resolve, 100));
return {
done: false,
value: this.data[this.index++]
};
}
[Symbol.asyncIterator]() {
return this;
}
}
// Using async iterator
async function processAsyncData() {
const source = new AsyncDataSource([1, 2, 3, 4, 5]);
for await (const item of source) {
console.log('Processing:', item);
}
}
// Transform stream pattern
class AsyncTransform {
constructor(transformFn) {
this.transformFn = transformFn;
}
async *process(source) {
for await (const item of source) {
const transformed = await this.transformFn(item);
yield transformed;
}
}
}
// Usage
const numbers = new AsyncDataSource([1, 2, 3, 4, 5]);
const doubler = new AsyncTransform(async (x) => {
await new Promise(resolve => setTimeout(resolve, 50));
return x * 2;
});
async function runTransform() {
for await (const result of doubler.process(numbers)) {
console.log('Transformed:', result);
}
}
Parallel Processing
Web Workers for CPU-Intensive Tasks
// Main thread worker management
class WorkerPool {
constructor(workerScript, poolSize = 4) {
this.workerScript = workerScript;
this.poolSize = poolSize;
this.workers = [];
this.taskQueue = [];
this.activeJobs = new Map();
this.jobIdCounter = 0;
this.initializeWorkers();
}
initializeWorkers() {
for (let i = 0; i < this.poolSize; i++) {
const worker = new Worker(this.workerScript);
worker.onmessage = (event) => this.handleWorkerMessage(worker, event);
worker.onerror = (error) => this.handleWorkerError(worker, error);
this.workers.push({ worker, busy: false });
}
}
handleWorkerMessage(worker, event) {
const { jobId, result, error } = event.data;
const workerObj = this.workers.find(w => w.worker === worker);
if (workerObj) {
workerObj.busy = false;
}
const job = this.activeJobs.get(jobId);
if (job) {
this.activeJobs.delete(jobId);
if (error) {
job.reject(error);
} else {
job.resolve(result);
}
}
this.processQueue();
}
handleWorkerError(worker, error) {
console.error('Worker error:', error);
const workerObj = this.workers.find(w => w.worker === worker);
if (workerObj) {
workerObj.busy = false;
}
this.processQueue();
}
async execute(data) {
return new Promise((resolve, reject) => {
const jobId = this.jobIdCounter++;
this.activeJobs.set(jobId, { resolve, reject });
this.taskQueue.push({ jobId, data });
this.processQueue();
});
}
processQueue() {
const availableWorker = this.workers.find(w => !w.busy);
if (availableWorker && this.taskQueue.length > 0) {
const task = this.taskQueue.shift();
availableWorker.busy = true;
availableWorker.worker.postMessage({
jobId: task.jobId,
data: task.data
});
}
}
terminate() {
this.workers.forEach(({ worker }) => worker.terminate());
this.workers = [];
this.taskQueue = [];
this.activeJobs.clear();
}
}
// Worker script content
const workerScript = `
self.onmessage = function(event) {
const { jobId, data } = event.data;
try {
// CPU-intensive task
const result = heavyComputation(data);
self.postMessage({ jobId, result });
} catch (error) {
self.postMessage({ jobId, error: error.message });
}
};
function heavyComputation(data) {
// Simulate heavy computation
let result = 0;
for (let i = 0; i < data.iterations; i++) {
result += Math.sqrt(i) * Math.sin(i);
}
return result;
}
`;
// Usage
async function parallelProcessingExample() {
const blob = new Blob([workerScript], { type: 'application/javascript' });
const workerUrl = URL.createObjectURL(blob);
const pool = new WorkerPool(workerUrl, 4);
try {
const tasks = [
{ iterations: 1000000 },
{ iterations: 2000000 },
{ iterations: 1500000 },
{ iterations: 3000000 }
];
const results = await Promise.all(
tasks.map(task => pool.execute(task))
);
console.log('Parallel results:', results);
} finally {
pool.terminate();
URL.revokeObjectURL(workerUrl);
}
}
SharedArrayBuffer and Atomics
// Shared memory between workers
class SharedMemoryProcessor {
constructor(size = 1024) {
this.sharedBuffer = new SharedArrayBuffer(size);
this.sharedArray = new Int32Array(this.sharedBuffer);
this.workers = [];
}
createWorker() {
const workerScript = `
self.onmessage = function(event) {
const { sharedBuffer, startIndex, length } = event.data;
const sharedArray = new Int32Array(sharedBuffer);
// Process shared memory
for (let i = startIndex; i < startIndex + length; i++) {
// Atomic operation
Atomics.add(sharedArray, i, 1);
}
self.postMessage({ done: true });
};
`;
const blob = new Blob([workerScript], { type: 'application/javascript' });
const worker = new Worker(URL.createObjectURL(blob));
this.workers.push(worker);
return worker;
}
async processParallel() {
const chunkSize = this.sharedArray.length / 4;
const workers = [];
for (let i = 0; i < 4; i++) {
const worker = this.createWorker();
const promise = new Promise(resolve => {
worker.onmessage = () => resolve();
});
worker.postMessage({
sharedBuffer: this.sharedBuffer,
startIndex: i * chunkSize,
length: chunkSize
});
workers.push(promise);
}
await Promise.all(workers);
return this.sharedArray;
}
terminate() {
this.workers.forEach(worker => worker.terminate());
this.workers = [];
}
}
// Usage
async function sharedMemoryExample() {
const processor = new SharedMemoryProcessor(4096);
try {
const result = await processor.processParallel();
console.log('Shared array result:', result);
} finally {
processor.terminate();
}
}
Advanced Concurrency Patterns
Semaphore and Mutex Implementation
// Semaphore for limiting concurrent operations
class Semaphore {
constructor(maxConcurrency) {
this.maxConcurrency = maxConcurrency;
this.currentConcurrency = 0;
this.waitQueue = [];
}
async acquire() {
if (this.currentConcurrency < this.maxConcurrency) {
this.currentConcurrency++;
return Promise.resolve();
}
return new Promise(resolve => {
this.waitQueue.push(resolve);
});
}
release() {
this.currentConcurrency--;
if (this.waitQueue.length > 0) {
const nextResolve = this.waitQueue.shift();
this.currentConcurrency++;
nextResolve();
}
}
async execute(task) {
await this.acquire();
try {
return await task();
} finally {
this.release();
}
}
}
// Usage
async function semaphoreExample() {
const semaphore = new Semaphore(3); // Max 3 concurrent operations
const tasks = Array.from({ length: 10 }, (_, i) =>
semaphore.execute(async () => {
console.log(`Task ${i} started`);
await new Promise(resolve => setTimeout(resolve, 1000));
console.log(`Task ${i} completed`);
return `Result ${i}`;
})
);
const results = await Promise.all(tasks);
console.log('All tasks completed:', results);
}
// Mutex for exclusive access
class Mutex {
constructor() {
this._locked = false;
this._queue = [];
}
async lock() {
if (!this._locked) {
this._locked = true;
return Promise.resolve();
}
return new Promise(resolve => {
this._queue.push(resolve);
});
}
unlock() {
if (this._queue.length > 0) {
const nextResolve = this._queue.shift();
nextResolve();
} else {
this._locked = false;
}
}
async execute(task) {
await this.lock();
try {
return await task();
} finally {
this.unlock();
}
}
}
Producer-Consumer Pattern
// Producer-Consumer with async queue
class AsyncQueue {
constructor() {
this.items = [];
this.consumerPromises = [];
this.closed = false;
}
async push(item) {
if (this.closed) {
throw new Error('Queue is closed');
}
if (this.consumerPromises.length > 0) {
const resolve = this.consumerPromises.shift();
resolve({ value: item, done: false });
} else {
this.items.push(item);
}
}
async pop() {
if (this.items.length > 0) {
return { value: this.items.shift(), done: false };
}
if (this.closed) {
return { value: undefined, done: true };
}
return new Promise(resolve => {
this.consumerPromises.push(resolve);
});
}
close() {
this.closed = true;
this.consumerPromises.forEach(resolve =>
resolve({ value: undefined, done: true })
);
this.consumerPromises = [];
}
[Symbol.asyncIterator]() {
return this;
}
async next() {
return this.pop();
}
}
// Producer function
async function producer(queue, items) {
for (const item of items) {
await new Promise(resolve => setTimeout(resolve, Math.random() * 1000));
await queue.push(item);
console.log('Produced:', item);
}
queue.close();
}
// Consumer function
async function consumer(queue, id) {
for await (const item of queue) {
console.log(`Consumer ${id} consumed:`, item);
await new Promise(resolve => setTimeout(resolve, Math.random() * 500));
}
console.log(`Consumer ${id} finished`);
}
// Usage
async function producerConsumerExample() {
const queue = new AsyncQueue();
const items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// Start producers and consumers
const producerPromise = producer(queue, items);
const consumerPromises = [
consumer(queue, 1),
consumer(queue, 2)
];
await Promise.all([producerPromise, ...consumerPromises]);
console.log('Producer-consumer example completed');
}
Circuit Breaker Pattern
// Circuit breaker for fault tolerance
class CircuitBreaker {
constructor(options = {}) {
this.failureThreshold = options.failureThreshold || 5;
this.resetTimeout = options.resetTimeout || 60000;
this.monitoringPeriod = options.monitoringPeriod || 60000;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.failureCount = 0;
this.lastFailureTime = null;
this.nextAttemptTime = null;
this.successCount = 0;
}
async execute(fn) {
if (this.state === 'OPEN') {
if (Date.now() < this.nextAttemptTime) {
throw new Error('Circuit breaker is OPEN');
} else {
this.state = 'HALF_OPEN';
}
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
onSuccess() {
this.failureCount = 0;
if (this.state === 'HALF_OPEN') {
this.successCount++;
if (this.successCount >= 3) {
this.state = 'CLOSED';
this.successCount = 0;
}
}
}
onFailure() {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN';
this.nextAttemptTime = Date.now() + this.resetTimeout;
}
}
getState() {
return {
state: this.state,
failureCount: this.failureCount,
lastFailureTime: this.lastFailureTime,
nextAttemptTime: this.nextAttemptTime
};
}
}
// Usage
async function circuitBreakerExample() {
const circuitBreaker = new CircuitBreaker({
failureThreshold: 3,
resetTimeout: 5000
});
// Simulate unreliable service
let shouldFail = true;
const unreliableService = async () => {
if (shouldFail) {
throw new Error('Service unavailable');
}
return 'Service response';
};
try {
// These will fail and eventually open the circuit
for (let i = 0; i < 5; i++) {
try {
const result = await circuitBreaker.execute(unreliableService);
console.log('Success:', result);
} catch (error) {
console.log('Failed:', error.message);
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
console.log('Circuit state:', circuitBreaker.getState());
// Wait for reset timeout
await new Promise(resolve => setTimeout(resolve, 6000));
// Now service should work
shouldFail = false;
const result = await circuitBreaker.execute(unreliableService);
console.log('Final success:', result);
} catch (error) {
console.error('Circuit breaker error:', error.message);
}
}
Performance Optimization
Batch Processing
// Batch processor for efficient operations
class BatchProcessor {
constructor(options = {}) {
this.batchSize = options.batchSize || 10;
this.batchTimeout = options.batchTimeout || 1000;
this.processFn = options.processFn;
this.currentBatch = [];
this.timeoutId = null;
this.processingPromise = null;
}
async add(item) {
this.currentBatch.push(item);
if (this.currentBatch.length >= this.batchSize) {
return this.processBatch();
}
if (!this.timeoutId) {
this.timeoutId = setTimeout(() => {
this.processBatch();
}, this.batchTimeout);
}
// Wait for processing if we're currently processing
if (this.processingPromise) {
await this.processingPromise;
}
}
async processBatch() {
if (this.currentBatch.length === 0) {
return;
}
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = null;
}
const batch = [...this.currentBatch];
this.currentBatch = [];
this.processingPromise = this.processFn(batch);
try {
await this.processingPromise;
} finally {
this.processingPromise = null;
}
}
async flush() {
await this.processBatch();
}
}
// Usage
async function batchProcessingExample() {
const batchProcessor = new BatchProcessor({
batchSize: 3,
batchTimeout: 2000,
processFn: async (batch) => {
console.log('Processing batch:', batch);
await new Promise(resolve => setTimeout(resolve, 500));
console.log('Batch processed:', batch);
}
});
// Add items
for (let i = 0; i < 8; i++) {
await batchProcessor.add(i);
await new Promise(resolve => setTimeout(resolve, 300));
}
// Flush remaining items
await batchProcessor.flush();
console.log('Batch processing completed');
}
This comprehensive coverage of concurrency models and parallel processing provides advanced techniques for building efficient, scalable JavaScript applications that can handle complex asynchronous operations and parallel workloads.
Changelog
2aa48
-web-deploy(Auto): Update base URL for web-pages branchon
Copyright
Copyright Ownership:WARREN Y.F. LONG
License under:Attribution-NonCommercial-NoDerivatives 4.0 International (CC-BY-NC-ND-4.0)