🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples de Multithreading Web TypeScript
Exemples de multithreading Web TypeScript incluant les Web Workers, les opérations asynchrones et le traitement parallèle
💻 Web Workers typescript
🟡 intermediate
⭐⭐⭐
Créer et utiliser des Web Workers pour une exécution parallèle dans le navigateur
⏱️ 30 min
🏷️ typescript, web, multithreading
Prerequisites:
Intermediate TypeScript, Web Workers API
// Web TypeScript Web Workers Examples
// Using Web Workers for parallel execution and background processing
// 1. Basic Web Worker Creation
class WorkerManager {
private workers: Map<string, Worker> = new Map();
// Create inline worker from code
createInlineWorker(name: string, workerCode: string): Worker {
const blob = new Blob([workerCode], { type: 'application/javascript' });
const workerUrl = URL.createObjectURL(blob);
const worker = new Worker(workerUrl);
this.workers.set(name, worker);
return worker;
}
// Create worker from external file
createWorker(name: string, workerFile: string): Worker {
const worker = new Worker(workerFile);
this.workers.set(name, worker);
return worker;
}
// Get worker by name
getWorker(name: string): Worker | undefined {
return this.workers.get(name);
}
// Terminate worker
terminateWorker(name: string): void {
const worker = this.workers.get(name);
if (worker) {
worker.terminate();
this.workers.delete(name);
}
}
// Terminate all workers
terminateAllWorkers(): void {
for (const [name, worker] of this.workers) {
worker.terminate();
}
this.workers.clear();
}
}
// 2. Worker Communication
class WorkerCommunication {
private worker: Worker;
constructor(worker: Worker) {
this.worker = worker;
}
// Send message to worker
postMessage(message: any, transfer?: Transferable[]): void {
this.worker.postMessage(message, transfer);
}
// Set onmessage handler
onMessage(handler: (message: MessageEvent) => void): void {
this.worker.onmessage = handler;
}
// Set onerror handler
onError(handler: (error: ErrorEvent) => void): void {
this.worker.onerror = handler;
}
// Send message and wait for response
async sendMessage(message: any): Promise<any> {
return new Promise((resolve, reject) => {
const handleMessage = (e: MessageEvent) => {
this.worker.removeEventListener('message', handleMessage);
resolve(e.data);
};
const handleError = (e: ErrorEvent) => {
this.worker.removeEventListener('error', handleError);
reject(e.error);
};
this.worker.addEventListener('message', handleMessage);
this.worker.addEventListener('error', handleError);
this.worker.postMessage(message);
});
}
}
// 3. Worker Task Executor
class WorkerTaskExecutor {
private workerManager: WorkerManager;
constructor() {
this.workerManager = new WorkerManager();
}
// Execute computation in worker
async executeComputation(input: any): Promise<any> {
const workerCode = `
self.onmessage = function(e) {
const input = e.data;
const result = performComputation(input);
self.postMessage(result);
};
function performComputation(input) {
// Perform computation here
return { result: input.value * 2 };
}
`;
const worker = this.workerManager.createInlineWorker('computation', workerCode);
const comm = new WorkerCommunication(worker);
const result = await comm.sendMessage(input);
this.workerManager.terminateWorker('computation');
return result;
}
// Execute parallel computations
async executeParallel(inputs: any[], workerCode: string): Promise<any[]> {
const workers: Worker[] = [];
const promises: Promise<any>[] = [];
for (const input of inputs) {
const worker = this.workerManager.createInlineWorker(`worker-${inputs.indexOf(input)}`, workerCode);
workers.push(worker);
const comm = new WorkerCommunication(worker);
promises.push(comm.sendMessage(input));
}
const results = await Promise.all(promises);
// Clean up workers
workers.forEach(worker => worker.terminate());
return results;
}
}
// 4. Worker Pool
class WorkerPool {
private workers: Worker[] = [];
private taskQueue: Array<{ task: any; resolve: (value: any) => void; reject: (reason: any) => void }> = [];
private availableWorkers: Worker[] = [];
constructor(workerCode: string, poolSize: number) {
// Create workers
for (let i = 0; i < poolSize; i++) {
const blob = new Blob([workerCode], { type: 'application/javascript' });
const workerUrl = URL.createObjectURL(blob);
const worker = new Worker(workerUrl);
worker.onmessage = (e) => {
const task = this.currentTask;
if (task) {
task.resolve(e.data);
this.releaseWorker(worker);
}
};
worker.onerror = (e) => {
const task = this.currentTask;
if (task) {
task.reject(e.error);
this.releaseWorker(worker);
}
};
this.workers.push(worker);
this.availableWorkers.push(worker);
}
}
private currentTask: any = null;
// Execute task
async execute(task: any): Promise<any> {
return new Promise((resolve, reject) => {
this.taskQueue.push({ task, resolve, reject });
this.processQueue();
});
}
private processQueue(): void {
if (this.taskQueue.length === 0 || this.availableWorkers.length === 0) {
return;
}
const taskItem = this.taskQueue.shift();
const worker = this.availableWorkers.shift()!;
this.currentTask = taskItem;
worker.postMessage(taskItem.task);
}
private releaseWorker(worker: Worker): void {
this.availableWorkers.push(worker);
this.processQueue();
}
// Terminate all workers
terminate(): void {
this.workers.forEach(worker => worker.terminate());
this.workers = [];
this.availableWorkers = [];
this.taskQueue = [];
}
}
// 5. Shared Worker
class SharedWorkerManager {
private sharedWorker: SharedWorker | null = null;
// Create shared worker
createSharedWorker(name: string, workerCode: string): SharedWorker {
const blob = new Blob([workerCode], { type: 'application/javascript' });
const workerUrl = URL.createObjectURL(blob);
this.sharedWorker = new SharedWorker(workerUrl, name);
return this.sharedWorker;
}
// Connect to shared worker port
connect(): MessagePort {
if (!this.sharedWorker) {
throw new Error('Shared worker not created');
}
return this.sharedWorker.port;
}
// Start connection
start(): void {
const port = this.connect();
port.start();
}
// Post message to shared worker
postMessage(message: any): void {
const port = this.connect();
port.postMessage(message);
}
// Set message handler
onMessage(handler: (message: MessageEvent) => void): void {
const port = this.connect();
port.onmessage = handler;
}
}
// 6. Worker with Progress Tracking
class WorkerWithProgress {
private worker: Worker;
private communication: WorkerCommunication;
constructor(workerCode: string) {
const blob = new Blob([workerCode], { type: 'application/javascript' });
const workerUrl = URL.createObjectURL(blob);
this.worker = new Worker(workerUrl);
this.communication = new WorkerCommunication(this.worker);
}
// Execute task with progress callback
async executeWithProgress(
task: any,
onProgress: (progress: number) => void
): Promise<any> {
return new Promise((resolve, reject) => {
this.communication.onMessage = (e: MessageEvent) => {
const data = e.data;
if (data.type === 'progress') {
onProgress(data.value);
} else if (data.type === 'complete') {
resolve(data.result);
} else if (data.type === 'error') {
reject(new Error(data.message));
}
};
this.communication.onError = (e: ErrorEvent) => {
reject(e.error);
};
this.communication.postMessage(task);
});
}
// Terminate worker
terminate(): void {
this.worker.terminate();
}
}
// 7. Batch Processing with Workers
class BatchWorkerProcessor {
private workerManager = new WorkerManager();
// Process batch of items
async processBatch<T, R>(
items: T[],
processorCode: string,
batchSize: number = 10,
onProgress?: (current: number, total: number) => void
): Promise<R[]> {
const results: R[] = [];
const batches: T[][] = [];
// Split into batches
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
// Process each batch
for (let i = 0; i < batches.length; i++) {
const batch = batches[i];
const batchResults = await this.processBatchImpl(batch, processorCode);
results.push(...batchResults);
if (onProgress) {
onProgress(Math.min((i + 1) * batchSize, items.length), items.length);
}
}
return results;
}
private async processBatchImpl<T, R>(batch: T[], processorCode: string): Promise<R[]> {
const workerCode = `
${processorCode}
self.onmessage = function(e) {
const batch = e.data;
const results = batch.map(processItem);
self.postMessage(results);
};
`;
const worker = this.workerManager.createInlineWorker('batch', workerCode);
const comm = new WorkerCommunication(worker);
const results = await comm.sendMessage(batch);
this.workerManager.terminateWorker('batch');
return results;
}
}
// 8. Worker Utilities
class WorkerUtils {
// Check if Web Workers are supported
static isSupported(): boolean {
return typeof Worker !== 'undefined';
}
// Check if Shared Workers are supported
static isSharedWorkerSupported(): boolean {
return typeof SharedWorker !== 'undefined';
}
// Create worker from function
static createWorkerFromFunction(fn: Function): Worker {
const fnString = fn.toString();
const workerCode = `
const workerFunction = ${fnString};
self.onmessage = (e) => {
const result = workerFunction(e.data);
self.postMessage(result);
};
`;
const blob = new Blob([workerCode], { type: 'application/javascript' });
const workerUrl = URL.createObjectURL(blob);
return new Worker(workerUrl);
}
// Execute function in worker
static async executeInWorker<T, R>(fn: (input: T) => R, input: T): Promise<R> {
const worker = this.createWorkerFromFunction(fn);
return new Promise((resolve, reject) => {
worker.onmessage = (e) => {
resolve(e.data as R);
worker.terminate();
};
worker.onerror = (e) => {
reject(e.error);
worker.terminate();
};
worker.postMessage(input);
});
}
}
// Usage Examples
async function demonstrateWebWorkers() {
console.log('=== Web TypeScript Web Workers Examples ===\n');
// Check support
console.log('--- Web Workers Support ---');
console.log(`Web Workers supported: ${WorkerUtils.isSupported()}`);
console.log(`Shared Workers supported: ${WorkerUtils.isSharedWorkerSupported()}`);
// 1. Basic worker
console.log('\n--- 1. Basic Web Worker ---');
const workerManager = new WorkerManager();
const basicWorkerCode = `
self.onmessage = function(e) {
const data = e.data;
console.log('Worker received:', data);
const result = data.value * 2;
self.postMessage({ result: result });
};
`;
const worker = workerManager.createInlineWorker('basic', basicWorkerCode);
const comm = new WorkerCommunication(worker);
comm.onMessage = (e) => {
console.log(`Worker response: ${JSON.stringify(e.data)}`);
};
comm.postMessage({ value: 21 });
// Wait for response
await new Promise(resolve => setTimeout(resolve, 100));
workerManager.terminateWorker('basic');
// 2. Execute computation in worker
console.log('\n--- 2. Execute Computation ---');
const executor = new WorkerTaskExecutor();
const computationResult = await executor.executeComputation({ value: 10 });
console.log(`Computation result: ${JSON.stringify(computationResult)}`);
// 3. Parallel execution
console.log('\n--- 3. Parallel Execution ---');
const parallelWorkerCode = `
self.onmessage = function(e) {
const input = e.data;
const result = input * input; // Square the input
self.postMessage(result);
};
`;
const inputs = [1, 2, 3, 4, 5];
const parallelResults = await executor.executeParallel(inputs, parallelWorkerCode);
console.log(`Parallel results: ${parallelResults.join(', ')}`);
// 4. Worker with progress
console.log('\n--- 4. Worker with Progress ---');
const progressWorkerCode = `
self.onmessage = function(e) {
const task = e.data;
// Simulate progress
for (let i = 0; i <= 100; i += 10) {
self.postMessage({ type: 'progress', value: i });
}
self.postMessage({ type: 'complete', result: task.input * 2 });
};
`;
const progressWorker = new WorkerWithProgress(progressWorkerCode);
const progressResult = await progressWorker.executeWithProgress(
{ input: 15 },
(progress) => console.log(`Progress: ${progress}%`)
);
console.log(`Result with progress: ${progressResult.result}`);
progressWorker.terminate();
// 5. Batch processing
console.log('\n--- 5. Batch Processing ---');
const batchProcessor = new BatchWorkerProcessor();
const processorCode = `
function processItem(item) {
return item * item;
}
`;
const items = Array.from({ length: 25 }, (_, i) => i + 1);
const batchResults = await batchProcessor.processBatch(
items,
processorCode,
5,
(current, total) => console.log(`Batch progress: ${current}/${total}`)
);
console.log(`Batch results (first 10): ${batchResults.slice(0, 10).join(', ')}...`);
// 6. Execute function in worker
console.log('\n--- 6. Execute Function in Worker ---');
const squareResult = await WorkerUtils.executeInWorker(
(x: number) => x * x,
7
);
console.log(`Square result: ${squareResult}`);
// 7. Heavy computation
console.log('\n--- 7. Heavy Computation ---');
const heavyWorkerCode = `
self.onmessage = function(e) {
const n = e.data.n;
let sum = 0;
for (let i = 1; i <= n; i++) {
sum += i;
}
self.postMessage({ sum: sum });
};
`;
const heavyWorker = workerManager.createInlineWorker('heavy', heavyWorkerCode);
const heavyComm = new WorkerCommunication(heavyWorker);
const heavyResult = await heavyComm.sendMessage({ n: 1000000 });
console.log(`Heavy computation result: Sum from 1 to 1000000 = ${heavyResult.sum}`);
workerManager.terminateWorker('heavy');
console.log('\n=== All Web Workers Examples Completed ===');
}
// Export classes
export { WorkerManager, WorkerCommunication, WorkerTaskExecutor, WorkerPool, SharedWorkerManager, WorkerWithProgress, BatchWorkerProcessor, WorkerUtils };
export { demonstrateWebWorkers };
💻 Opérations Asynchrones typescript
🟡 intermediate
⭐⭐⭐
Maîtriser les async/await, Promises et l'exécution concurrente dans TypeScript
⏱️ 25 min
🏷️ typescript, web, multithreading, async
Prerequisites:
Intermediate TypeScript, Async/Await
// Web TypeScript Async/Await Operations Examples
// Mastering Promises, async/await, and concurrent execution patterns
// 1. Promise Basics
class PromiseBasics {
// Create a resolving promise
static resolve<T>(value: T): Promise<T> {
return Promise.resolve(value);
}
// Create a rejecting promise
static reject<T>(error: T): Promise<never> {
return Promise.reject(error);
}
// Create promise from callback
static fromCallback<T>(
callback: (cb: (error: Error | null, result: T) => void) => void
): Promise<T> {
return new Promise((resolve, reject) => {
callback((error, result) => {
if (error) {
reject(error);
} else {
resolve(result);
}
});
});
}
// Delay execution
static delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Timeout a promise
static withTimeout<T>(promise: Promise<T>, timeoutMs: number, errorMessage: string = 'Timeout'): Promise<T> {
return Promise.race([
promise,
PromiseBasics.delay(timeoutMs).then(() => {
throw new Error(errorMessage);
})
]);
}
// Retry promise
static async retry<T>(
fn: () => Promise<T>,
maxAttempts: number = 3,
delayMs: number = 1000
): Promise<T> {
let lastError: Error;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error as Error;
if (attempt < maxAttempts) {
await this.delay(delayMs * attempt);
}
}
}
throw lastError!;
}
}
// 2. Async Control Flow
class AsyncControlFlow {
// Execute sequentially
static async sequential<T>(tasks: Array<() => Promise<T>>): Promise<T[]> {
const results: T[] = [];
for (const task of tasks) {
const result = await task();
results.push(result);
}
return results;
}
// Execute parallel with limit
static async parallelWithLimit<T>(
tasks: Array<() => Promise<T>>,
limit: number
): Promise<T[]> {
const results: T[] = [];
const executing: Promise<void>[] = [];
for (const task of tasks) {
const promise = task().then(result => {
results.push(result);
});
executing.push(promise);
if (executing.length >= limit) {
await Promise.race(executing);
executing.splice(executing.findIndex(p => p === promise), 1);
}
}
await Promise.all(executing);
return results;
}
// Execute parallel with batching
static async batchParallel<T>(
items: T[],
processor: (item: T) => Promise<any>,
batchSize: number
): Promise<any[]> {
const results: any[] = [];
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchResults = await Promise.all(batch.map(processor));
results.push(...batchResults);
}
return results;
}
// Execute with fallback
static async withFallback<T>(
primary: () => Promise<T>,
fallback: () => Promise<T>
): Promise<T> {
try {
return await primary();
} catch {
return await fallback();
}
}
// Execute with circuit breaker
static withCircuitBreaker<T>(
fn: () => Promise<T>,
options: {
timeout?: number;
maxFailures?: number;
resetTimeout?: number;
} = {}
): () => Promise<T> {
let failures = 0;
let lastFailureTime = 0;
let circuitOpen = false;
const {
timeout = 5000,
maxFailures = 5,
resetTimeout = 60000
} = options;
return async () => {
if (circuitOpen) {
if (Date.now() - lastFailureTime > resetTimeout) {
circuitOpen = false;
failures = 0;
} else {
throw new Error('Circuit breaker is open');
}
}
try {
const result = await PromiseBasics.withTimeout(fn(), timeout);
failures = 0;
return result;
} catch (error) {
failures++;
lastFailureTime = Date.now();
if (failures >= maxFailures) {
circuitOpen = true;
}
throw error;
}
};
}
}
// 3. Async Utilities
class AsyncUtilities {
// Map array asynchronously
static async map<T, R>(
array: T[],
mapper: (item: T, index: number) => Promise<R>
): Promise<R[]> {
const promises = array.map((item, index) => mapper(item, index));
return Promise.all(promises);
}
// Filter array asynchronously
static async filter<T>(
array: T[],
predicate: (item: T, index: number) => Promise<boolean>
): Promise<T[]> {
const results = await Promise.all(
array.map(async (item, index) => ({
item,
keep: await predicate(item, index)
}))
);
return results.filter(result => result.keep).map(result => result.item);
}
// Reduce array asynchronously
static async reduce<T, R>(
array: T[],
reducer: (accumulator: R, item: T, index: number) => Promise<R>,
initialValue: R
): Promise<R> {
let accumulator = initialValue;
for (let i = 0; i < array.length; i++) {
accumulator = await reducer(accumulator, array[i], i);
}
return accumulator;
}
// Find asynchronously
static async find<T>(
array: T[],
predicate: (item: T) => Promise<boolean>
): Promise<T | undefined> {
for (const item of array) {
if (await predicate(item)) {
return item;
}
}
return undefined;
}
// Some asynchronously
static async some<T>(
array: T[],
predicate: (item: T) => Promise<boolean>
): Promise<boolean> {
for (const item of array) {
if (await predicate(item)) {
return true;
}
}
return false;
}
// Every asynchronously
static async every<T>(
array: T[],
predicate: (item: T) => Promise<boolean>
): Promise<boolean> {
for (const item of array) {
if (!(await predicate(item))) {
return false;
}
}
return true;
}
}
// 4. Async Queue
class AsyncQueue<T> {
private queue: T[] = [];
private pendingWaiters: Array<(value: T) => void> = [];
// Enqueue item
enqueue(item: T): void {
if (this.pendingWaiters.length > 0) {
const waiter = this.pendingWaiters.shift()!;
waiter(item);
} else {
this.queue.push(item);
}
}
// Dequeue item
async dequeue(): Promise<T> {
if (this.queue.length > 0) {
return this.queue.shift()!;
}
return new Promise(resolve => {
this.pendingWaiters.push(resolve);
});
}
// Get size
get size(): number {
return this.queue.length;
}
}
// 5. Async Mutex
class AsyncMutex {
private locked = false;
private waiters: Array<() => void> = [];
// Acquire lock
async acquire(): Promise<void> {
if (!this.locked) {
this.locked = true;
return;
}
return new Promise(resolve => {
this.waiters.push(resolve);
});
}
// Release lock
release(): void {
if (this.waiters.length > 0) {
const waiter = this.waiters.shift()!;
waiter();
} else {
this.locked = false;
}
}
// Execute critical section
async execute<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}
// 6. Async Semaphore
class AsyncSemaphore {
private permits: number;
private waiters: Array<() => void> = [];
constructor(permits: number) {
this.permits = permits;
}
// Acquire permit
async acquire(): Promise<void> {
if (this.permits > 0) {
this.permits--;
return;
}
return new Promise(resolve => {
this.waiters.push(resolve);
});
}
// Release permit
release(): void {
if (this.waiters.length > 0) {
const waiter = this.waiters.shift()!;
waiter();
} else {
this.permits++;
}
}
// Execute with semaphore
async execute<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}
// 7. Async Cache
class AsyncCache<K, V> {
private cache = new Map<K, V>();
private pending = new Map<K, Promise<V>>();
// Get or compute value
async getOrCompute(key: K, compute: () => Promise<V>): Promise<V> {
// Check cache
if (this.cache.has(key)) {
return this.cache.get(key)!;
}
// Check pending computation
if (this.pending.has(key)) {
return this.pending.get(key)!;
}
// Compute new value
const promise = compute().then(value => {
this.cache.set(key, value);
this.pending.delete(key);
return value;
});
this.pending.set(key, promise);
return promise;
}
// Get value
get(key: K): V | undefined {
return this.cache.get(key);
}
// Set value
set(key: K, value: V): void {
this.cache.set(key, value);
}
// Clear cache
clear(): void {
this.cache.clear();
this.pending.clear();
}
// Get size
get size(): number {
return this.cache.size;
}
}
// 8. Async Rate Limiter
class AsyncRateLimiter {
private queue: Array<() => void> = [];
private running = 0;
private interval: number;
constructor(concurrency: number, intervalMs: number) {
this.interval = intervalMs;
this.running = concurrency;
// Start ticker
setInterval(() => {
if (this.queue.length > 0 && this.running < concurrency) {
const task = this.queue.shift()!;
task();
this.running--;
}
}, intervalMs);
}
// Execute with rate limiting
async execute<T>(fn: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
const task = async () => {
try {
const result = await fn();
resolve(result);
} catch (error) {
reject(error);
}
};
if (this.running > 0) {
this.running--;
task();
} else {
this.queue.push(task);
}
});
}
}
// Usage Examples
async function demonstrateAsyncOperations() {
console.log('=== Web TypeScript Async/Await Operations Examples ===\n');
// 1. Promise basics
console.log('--- 1. Promise Basics ---');
const resolved = await PromiseBasics.resolve(42);
console.log(`Resolved: ${resolved}`);
try {
await PromiseBasics.reject(new Error('Test error'));
} catch (error) {
console.log(`Rejected: ${(error as Error).message}`);
}
await PromiseBasics.delay(100);
console.log('Delay completed');
// 2. Sequential vs parallel
console.log('\n--- 2. Sequential vs Parallel ---');
const tasks = [
() => PromiseBasics.delay(100).then(() => 1),
() => PromiseBasics.delay(200).then(() => 2),
() => PromiseBasics.delay(150).then(() => 3)
];
const startTime = Date.now();
const sequentialResults = await AsyncControlFlow.sequential(tasks);
console.log(`Sequential: ${sequentialResults.join(', ')} (${Date.now() - startTime}ms)`);
const parallelResults = await Promise.all(tasks.map(t => t()));
console.log(`Parallel: ${parallelResults.join(', ')} (${Date.now() - startTime}ms)`);
// 3. Parallel with limit
console.log('\n--- 3. Parallel with Limit ---');
const limitStartTime = Date.now();
const limitedResults = await AsyncControlFlow.parallelWithLimit(tasks, 2);
console.log(`Limited (2): ${limitedResults.join(', ')} (${Date.now() - limitStartTime}ms)`);
// 4. Async map
console.log('\n--- 4. Async Map ---');
const numbers = [1, 2, 3, 4, 5];
const doubled = await AsyncUtilities.map(numbers, async (n) => {
await PromiseBasics.delay(50);
return n * 2;
});
console.log(`Doubled: ${doubled.join(', ')}`);
// 5. Async filter
console.log('\n--- 5. Async Filter ---');
const evens = await AsyncUtilities.filter(numbers, async (n) => {
await PromiseBasics.delay(10);
return n % 2 === 0;
});
console.log(`Even numbers: ${evens.join(', ')}`);
// 6. Async queue
console.log('\n--- 6. Async Queue ---');
const queue = new AsyncQueue<number>();
queue.enqueue(1);
queue.enqueue(2);
queue.enqueue(3);
console.log(`Queue size: ${queue.size}`);
console.log(`Dequeue: ${await queue.dequeue()}`);
console.log(`Dequeue: ${await queue.dequeue()}`);
// 7. Async mutex
console.log('\n--- 7. Async Mutex ---');
const mutex = new AsyncMutex();
const mutexTask = mutex.execute(async () => {
console.log('Critical section start');
await PromiseBasics.delay(100);
console.log('Critical section end');
return 'done';
});
console.log(`Mutex result: ${await mutexTask}`);
// 8. Async cache
console.log('\n--- 8. Async Cache ---');
const cache = new AsyncCache<string, string>();
const computeResult1 = await cache.getOrCompute('key1', async () => {
console.log('Computing value for key1');
await PromiseBasics.delay(100);
return 'value1';
});
const computeResult2 = await cache.getOrCompute('key1', async () => {
console.log('This should not print');
return 'value1-different';
});
console.log(`Cache results: ${computeResult1}, ${computeResult2}`);
console.log(`Cache size: ${cache.size}`);
// 9. Retry
console.log('\n--- 9. Retry ---');
let attempts = 0;
const retryResult = await PromiseBasics.retry(async () => {
attempts++;
console.log(`Attempt ${attempts}`);
if (attempts < 3) {
throw new Error('Not yet');
}
return 'success';
}, 5, 50);
console.log(`Retry result: ${retryResult}`);
// 10. Timeout
console.log('\n--- 10. Timeout ---');
try {
await PromiseBasics.withTimeout(
PromiseBasics.delay(2000).then(() => 'slow'),
100,
'Too slow'
);
} catch (error) {
console.log(`Timeout error: ${(error as Error).message}`);
}
console.log('\n=== All Async/Await Examples Completed ===');
}
// Export classes
export { PromiseBasics, AsyncControlFlow, AsyncUtilities, AsyncQueue, AsyncMutex, AsyncSemaphore, AsyncCache, AsyncRateLimiter };
export { demonstrateAsyncOperations };
💻 Primitives de Synchronisation typescript
🔴 complex
⭐⭐⭐⭐
Implémenter la synchronisation en utilisant Atomics, SharedArrayBuffer et les verrous
⏱️ 40 min
🏷️ typescript, web, multithreading, synchronization
Prerequisites:
Advanced TypeScript, Atomics API, SharedArrayBuffer
// Web TypeScript Synchronization Primitives Examples
// Using Atomics, SharedArrayBuffer, and custom synchronization
// 1. Atomics Operations
class AtomicsOperations {
// Create shared buffer
static createSharedBuffer(size: number): Int32Array {
const buffer = new SharedArrayBuffer(size * Int32Array.BYTES_PER_ELEMENT);
return new Int32Array(buffer);
}
// Compare and swap
static compareAndSwap(
array: Int32Array,
index: number,
expectedValue: number,
newValue: number
): boolean {
return Atomics.compareExchange(array, index, expectedValue, newValue) === expectedValue;
}
// Add atomically
static add(array: Int32Array, index: number, value: number): number {
return Atomics.add(array, index, value);
}
// Sub atomically
static sub(array: Int32Array, index: number, value: number): number {
return Atomics.sub(array, index, value);
}
// Increment atomically
static increment(array: Int32Array, index: number): number {
return Atomics.add(array, index, 1);
}
// Decrement atomically
static decrement(array: Int32Array, index: number): number {
return Atomics.sub(array, index, 1);
}
// Load atomically
static load(array: Int32Array, index: number): number {
return Atomics.load(array, index);
}
// Store atomically
static store(array: Int32Array, index: number, value: number): void {
Atomics.store(array, index, value);
}
// Wait for notification
static waitAsync(array: Int32Array, index: number, expectedValue: number, timeout?: number): Promise<'ok' | 'not-equal' | 'timed-out'> {
return Atomics.waitAsync(array, index, expectedValue, timeout).then(() => 'ok');
}
// Notify waiters
static notify(array: Int32Array, index: number, count: number): number {
return Atomics.notify(array, index, count);
}
}
// 2. Spinlock
class Spinlock {
private locked: Int32Array;
constructor() {
this.locked = AtomicsOperations.createSharedBuffer(1);
}
// Acquire lock (busy-wait)
acquire(): void {
while (true) {
// Try to acquire lock
const previous = AtomicsOperations.exchange(this.locked, 0, 1);
if (previous === 0) {
// Lock acquired
return;
}
// Wait for notification
Atomics.wait(this.locked, 0, 1);
}
}
// Release lock
release(): void {
AtomicsOperations.store(this.locked, 0, 0);
AtomicsOperations.notify(this.locked, 0, 1);
}
}
// 3. Mutex (Better than Spinlock)
class Mutex {
private locked: Int32Array;
constructor() {
this.locked = AtomicsOperations.createSharedBuffer(1);
}
// Acquire lock with timeout
async acquire(timeoutMs: number = 5000): Promise<boolean> {
const start = Date.now();
while (true) {
// Try to acquire lock
const previous = AtomicsOperations.compareAndSwap(this.locked, 0, 0, 1);
if (previous === 0) {
return true;
}
// Check timeout
if (Date.now() - start > timeoutMs) {
return false;
}
// Wait with timeout
await AtomicsOperations.waitAsync(this.locked, 0, 1, 100);
}
}
// Release lock
release(): void {
AtomicsOperations.store(this.locked, 0, 0);
AtomicsOperations.notify(this.locked, 0, 1);
}
// Execute critical section
async execute<T>(fn: () => Promise<T>, timeoutMs?: number): Promise<T> {
const acquired = await this.acquire(timeoutMs);
if (!acquired) {
throw new Error('Mutex acquisition timeout');
}
try {
return await fn();
} finally {
this.release();
}
}
}
// 4. Semaphore
class Semaphore {
private permits: Int32Array;
constructor(initialPermits: number) {
this.permits = AtomicsOperations.createSharedBuffer(1);
AtomicsOperations.store(this.permits, 0, initialPermits);
}
// Acquire permit
async acquire(timeoutMs: number = 5000): Promise<boolean> {
const start = Date.now();
while (true) {
const current = AtomicsOperations.load(this.permits, 0);
if (current > 0) {
// Try to acquire permit
const previous = AtomicsOperations.compareAndSwap(
this.permits,
0,
current,
current - 1
);
if (previous === current) {
return true;
}
}
// Check timeout
if (Date.now() - start > timeoutMs) {
return false;
}
// Wait
await AtomicsOperations.waitAsync(this.permits, 0, current, 100);
}
}
// Release permit
release(): void {
AtomicsOperations.increment(this.permits, 0);
AtomicsOperations.notify(this.permits, 0, 1);
}
// Execute with semaphore
async execute<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}
// 5. Barrier
class Barrier {
private count: Int32Array;
private waiting: Int32Array;
private total: number;
constructor(parties: number) {
this.total = parties;
this.count = AtomicsOperations.createSharedBuffer(1);
this.waiting = AtomicsOperations.createSharedBuffer(1);
}
// Wait for all parties
async wait(): Promise<void> {
const index = AtomicsOperations.add(this.waiting, 0, 1);
if (index === this.total) {
// Last party resets the barrier
AtomicsOperations.store(this.waiting, 0, 0);
AtomicsOperations.store(this.count, 0, 0);
AtomicsOperations.notify(this.count, 0, this.total);
} else {
// Wait for last party
await AtomicsOperations.waitAsync(this.count, 0, 0);
}
}
// Reset barrier
reset(): void {
AtomicsOperations.store(this.waiting, 0, 0);
AtomicsOperations.store(this.count, 0, 0);
}
}
// 6. Read-Write Lock
class ReadWriteLock {
private readers: Int32Array;
private writer: Int32Array;
constructor() {
this.readers = AtomicsOperations.createSharedBuffer(1);
this.writer = AtomicsOperations.createSharedBuffer(1);
}
// Acquire read lock
async acquireReadLock(): Promise<void> {
while (true) {
// Wait if writer is active
while (AtomicsOperations.load(this.writer, 0) !== 0) {
await AtomicsOperations.waitAsync(this.writer, 0, 0);
}
// Increment readers
AtomicsOperations.increment(this.readers, 0);
// Check if writer is still not active
if (AtomicsOperations.load(this.writer, 0) === 0) {
return;
}
// Writer became active, back off
AtomicsOperations.decrement(this.readers, 0);
}
}
// Release read lock
releaseReadLock(): void {
AtomicsOperations.decrement(this.readers, 0);
// Notify potential writer if no readers left
if (AtomicsOperations.load(this.readers, 0) === 0) {
AtomicsOperations.notify(this.writer, 0, 1);
}
}
// Acquire write lock
async acquireWriteLock(): Promise<void> {
// Mark writer as waiting
while (true) {
// Wait for readers to finish
while (AtomicsOperations.load(this.readers, 0) !== 0) {
await AtomicsOperations.waitAsync(this.readers, 0, 1);
}
// Try to acquire write lock
const previous = AtomicsOperations.compareAndSwap(this.writer, 0, 0, 1);
if (previous === 0) {
return;
}
}
}
// Release write lock
releaseWriteLock(): void {
AtomicsOperations.store(this.writer, 0, 0);
AtomicsOperations.notify(this.writer, 0, 1);
AtomicsOperations.notify(this.readers, 0, 1);
}
// Execute with read lock
async executeRead<T>(fn: () => Promise<T>): Promise<T> {
await this.acquireReadLock();
try {
return await fn();
} finally {
this.releaseReadLock();
}
}
// Execute with write lock
async executeWrite<T>(fn: () => Promise<T>): Promise<T> {
await this.acquireWriteLock();
try {
return await fn();
} finally {
this.releaseWriteLock();
}
}
}
// 7. Concurrent Counter
class ConcurrentCounter {
private counter: Int32Array;
constructor(initialValue: number = 0) {
this.counter = AtomicsOperations.createSharedBuffer(1);
AtomicsOperations.store(this.counter, 0, initialValue);
}
// Increment and get
incrementAndGet(): number {
return AtomicsOperations.add(this.counter, 0, 1) + 1;
}
// Get and increment
getAndIncrement(): number {
return AtomicsOperations.add(this.counter, 0, 1);
}
// Decrement and get
decrementAndGet(): number {
return AtomicsOperations.sub(this.counter, 0, 1) - 1;
}
// Get and decrement
getAndDecrement(): number {
return AtomicsOperations.sub(this.counter, 0, 1);
}
// Get current value
get(): number {
return AtomicsOperations.load(this.counter, 0);
}
// Set value
set(value: number): void {
AtomicsOperations.store(this.counter, 0, value);
}
// Add and get
addAndGet(value: number): number {
return AtomicsOperations.add(this.counter, 0, value) + value;
}
}
// 8. Producer-Consumer Queue
class ProducerConsumerQueue<T> {
private buffer: T[] = [];
private size: Int32Array;
private count: Int32Array;
private head: Int32Array;
private tail: Int32Array;
constructor(capacity: number) {
this.buffer = new Array(capacity);
this.size = AtomicsOperations.createSharedBuffer(1);
this.count = AtomicsOperations.createSharedBuffer(1);
this.head = AtomicsOperations.createSharedBuffer(1);
this.tail = AtomicsOperations.createSharedBuffer(1);
AtomicsOperations.store(this.size, 0, capacity);
}
// Enqueue item
async enqueue(item: T): Promise<void> {
while (true) {
const currentCount = AtomicsOperations.load(this.count, 0);
const capacity = AtomicsOperations.load(this.size, 0);
if (currentCount < capacity) {
// Not full, enqueue
const tail = AtomicsOperations.load(this.tail, 0);
this.buffer[tail] = item;
AtomicsOperations.store(this.tail, 0, (tail + 1) % capacity);
AtomicsOperations.increment(this.count, 0);
AtomicsOperations.notify(this.count, 0, 1);
return;
}
// Full, wait for space
await AtomicsOperations.waitAsync(this.count, 0, currentCount);
}
}
// Dequeue item
async dequeue(): Promise<T> {
while (true) {
const currentCount = AtomicsOperations.load(this.count, 0);
if (currentCount > 0) {
// Not empty, dequeue
const head = AtomicsOperations.load(this.head, 0);
const capacity = AtomicsOperations.load(this.size, 0);
const item = this.buffer[head];
this.buffer[head] = undefined as any;
AtomicsOperations.store(this.head, 0, (head + 1) % capacity);
AtomicsOperations.decrement(this.count, 0);
AtomicsOperations.notify(this.count, 0, 1);
return item;
}
// Empty, wait for item
await AtomicsOperations.waitAsync(this.count, 0, currentCount);
}
}
// Get current count
get count(): number {
return AtomicsOperations.load(this.count, 0);
}
}
// Usage Examples
async function demonstrateSynchronization() {
console.log('=== Web TypeScript Synchronization Primitives Examples ===\n');
// 1. Atomics operations
console.log('--- 1. Atomics Operations ---');
const buffer = AtomicsOperations.createSharedBuffer(5);
for (let i = 0; i < buffer.length; i++) {
AtomicsOperations.store(buffer, i, i * 10);
}
console.log('Buffer values:');
for (let i = 0; i < buffer.length; i++) {
console.log(` [${i}] = ${buffer[i]}`);
}
AtomicsOperations.add(buffer, 0, 5);
console.log(`After add to [0]: ${buffer[0]}`);
// 2. Concurrent counter
console.log('\n--- 2. Concurrent Counter ---');
const counter = new ConcurrentCounter(0);
// Simulate concurrent increments
const incrementTasks = Array.from({ length: 10 }, (_, i) =>
PromiseBasics.delay(Math.random() * 100).then(() => {
counter.incrementAndGet();
})
);
await Promise.all(incrementTasks);
console.log(`Final counter value: ${counter.get()}`);
// 3. Mutex
console.log('\n--- 3. Mutex ---');
const mutex = new Mutex();
let sharedResource = 0;
const mutexTasks = Array.from({ length: 5 }, (_, i) =>
mutex.execute(async () => {
const value = sharedResource;
console.log(`Task ${i} read: ${value}`);
await PromiseBasics.delay(50);
sharedResource = value + 1;
console.log(`Task ${i} wrote: ${sharedResource}`);
return sharedResource;
})
);
await Promise.all(mutexTasks);
console.log(`Final shared resource value: ${sharedResource}`);
// 4. Semaphore
console.log('\n--- 4. Semaphore ---');
const semaphore = new Semaphore(2); // Allow 2 concurrent operations
let activeCount = 0;
let maxActive = 0;
const semaphoreTasks = Array.from({ length: 10 }, (_, i) =>
semaphore.execute(async () => {
activeCount++;
maxActive = Math.max(maxActive, activeCount);
console.log(`Task ${i} started (active: ${activeCount})`);
await PromiseBasics.delay(Math.random() * 200);
activeCount--;
console.log(`Task ${i} completed`);
})
);
await Promise.all(semaphoreTasks);
console.log(`Max concurrent tasks: ${maxActive}`);
// 5. Barrier
console.log('\n--- 5. Barrier ---');
const barrier = new Barrier(3);
const barrierTasks = Array.from({ length: 3 }, (_, i) =>
barrier.wait().then(() => {
console.log(`Task ${i} passed barrier`);
})
);
await Promise.all(barrierTasks);
// 6. Producer-Consumer
console.log('\n--- 6. Producer-Consumer Queue ---');
const queue = new ProducerConsumerQueue<number>(5);
// Producer
queue.enqueue(1);
setTimeout(() => queue.enqueue(2), 100);
setTimeout(() => queue.enqueue(3), 200);
// Consumer
const item1 = await queue.dequeue();
console.log(`Dequeued: ${item1}`);
const item2 = await queue.dequeue();
console.log(`Dequeued: ${item2}`);
console.log(`Queue count: ${queue.count}`);
console.log('\n=== All Synchronization Primitives Examples Completed ===');
}
// Import PromiseBasics
class PromiseBasics {
static delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Export classes
export { AtomicsOperations, Spinlock, Mutex, Semaphore, Barrier, ReadWriteLock, ConcurrentCounter, ProducerConsumerQueue };
export { demonstrateSynchronization };