🎯 empfohlene Sammlungen
Balanced sample collections from various categories for you to explore
Web TypeScript Multithreading-Beispiele
Web TypeScript Multithreading-Beispiele einschließlich Web Workers, asynchrone Operationen und parallele Verarbeitung
💻 Web Workers typescript
🟡 intermediate
⭐⭐⭐
Web Workers erstellen und verwenden für parallele Ausführung im Browser
⏱️ 30 min
🏷️ typescript, web, multithreading
Prerequisites:
Intermediate TypeScript, Web Workers API
// Web TypeScript Web Workers Examples
// Using Web Workers for parallel execution and background processing
// 1. Basic Web Worker Creation
class WorkerManager {
private workers: Map<string, Worker> = new Map();
// Create inline worker from code
createInlineWorker(name: string, workerCode: string): Worker {
const blob = new Blob([workerCode], { type: 'application/javascript' });
const workerUrl = URL.createObjectURL(blob);
const worker = new Worker(workerUrl);
this.workers.set(name, worker);
return worker;
}
// Create worker from external file
createWorker(name: string, workerFile: string): Worker {
const worker = new Worker(workerFile);
this.workers.set(name, worker);
return worker;
}
// Get worker by name
getWorker(name: string): Worker | undefined {
return this.workers.get(name);
}
// Terminate worker
terminateWorker(name: string): void {
const worker = this.workers.get(name);
if (worker) {
worker.terminate();
this.workers.delete(name);
}
}
// Terminate all workers
terminateAllWorkers(): void {
for (const [name, worker] of this.workers) {
worker.terminate();
}
this.workers.clear();
}
}
// 2. Worker Communication
class WorkerCommunication {
private worker: Worker;
constructor(worker: Worker) {
this.worker = worker;
}
// Send message to worker
postMessage(message: any, transfer?: Transferable[]): void {
this.worker.postMessage(message, transfer);
}
// Set onmessage handler
onMessage(handler: (message: MessageEvent) => void): void {
this.worker.onmessage = handler;
}
// Set onerror handler
onError(handler: (error: ErrorEvent) => void): void {
this.worker.onerror = handler;
}
// Send message and wait for response
async sendMessage(message: any): Promise<any> {
return new Promise((resolve, reject) => {
const handleMessage = (e: MessageEvent) => {
this.worker.removeEventListener('message', handleMessage);
resolve(e.data);
};
const handleError = (e: ErrorEvent) => {
this.worker.removeEventListener('error', handleError);
reject(e.error);
};
this.worker.addEventListener('message', handleMessage);
this.worker.addEventListener('error', handleError);
this.worker.postMessage(message);
});
}
}
// 3. Worker Task Executor
class WorkerTaskExecutor {
private workerManager: WorkerManager;
constructor() {
this.workerManager = new WorkerManager();
}
// Execute computation in worker
async executeComputation(input: any): Promise<any> {
const workerCode = `
self.onmessage = function(e) {
const input = e.data;
const result = performComputation(input);
self.postMessage(result);
};
function performComputation(input) {
// Perform computation here
return { result: input.value * 2 };
}
`;
const worker = this.workerManager.createInlineWorker('computation', workerCode);
const comm = new WorkerCommunication(worker);
const result = await comm.sendMessage(input);
this.workerManager.terminateWorker('computation');
return result;
}
// Execute parallel computations
async executeParallel(inputs: any[], workerCode: string): Promise<any[]> {
const workers: Worker[] = [];
const promises: Promise<any>[] = [];
for (const input of inputs) {
const worker = this.workerManager.createInlineWorker(`worker-${inputs.indexOf(input)}`, workerCode);
workers.push(worker);
const comm = new WorkerCommunication(worker);
promises.push(comm.sendMessage(input));
}
const results = await Promise.all(promises);
// Clean up workers
workers.forEach(worker => worker.terminate());
return results;
}
}
// 4. Worker Pool
class WorkerPool {
private workers: Worker[] = [];
private taskQueue: Array<{ task: any; resolve: (value: any) => void; reject: (reason: any) => void }> = [];
private availableWorkers: Worker[] = [];
constructor(workerCode: string, poolSize: number) {
// Create workers
for (let i = 0; i < poolSize; i++) {
const blob = new Blob([workerCode], { type: 'application/javascript' });
const workerUrl = URL.createObjectURL(blob);
const worker = new Worker(workerUrl);
worker.onmessage = (e) => {
const task = this.currentTask;
if (task) {
task.resolve(e.data);
this.releaseWorker(worker);
}
};
worker.onerror = (e) => {
const task = this.currentTask;
if (task) {
task.reject(e.error);
this.releaseWorker(worker);
}
};
this.workers.push(worker);
this.availableWorkers.push(worker);
}
}
private currentTask: any = null;
// Execute task
async execute(task: any): Promise<any> {
return new Promise((resolve, reject) => {
this.taskQueue.push({ task, resolve, reject });
this.processQueue();
});
}
private processQueue(): void {
if (this.taskQueue.length === 0 || this.availableWorkers.length === 0) {
return;
}
const taskItem = this.taskQueue.shift();
const worker = this.availableWorkers.shift()!;
this.currentTask = taskItem;
worker.postMessage(taskItem.task);
}
private releaseWorker(worker: Worker): void {
this.availableWorkers.push(worker);
this.processQueue();
}
// Terminate all workers
terminate(): void {
this.workers.forEach(worker => worker.terminate());
this.workers = [];
this.availableWorkers = [];
this.taskQueue = [];
}
}
// 5. Shared Worker
class SharedWorkerManager {
private sharedWorker: SharedWorker | null = null;
// Create shared worker
createSharedWorker(name: string, workerCode: string): SharedWorker {
const blob = new Blob([workerCode], { type: 'application/javascript' });
const workerUrl = URL.createObjectURL(blob);
this.sharedWorker = new SharedWorker(workerUrl, name);
return this.sharedWorker;
}
// Connect to shared worker port
connect(): MessagePort {
if (!this.sharedWorker) {
throw new Error('Shared worker not created');
}
return this.sharedWorker.port;
}
// Start connection
start(): void {
const port = this.connect();
port.start();
}
// Post message to shared worker
postMessage(message: any): void {
const port = this.connect();
port.postMessage(message);
}
// Set message handler
onMessage(handler: (message: MessageEvent) => void): void {
const port = this.connect();
port.onmessage = handler;
}
}
// 6. Worker with Progress Tracking
class WorkerWithProgress {
private worker: Worker;
private communication: WorkerCommunication;
constructor(workerCode: string) {
const blob = new Blob([workerCode], { type: 'application/javascript' });
const workerUrl = URL.createObjectURL(blob);
this.worker = new Worker(workerUrl);
this.communication = new WorkerCommunication(this.worker);
}
// Execute task with progress callback
async executeWithProgress(
task: any,
onProgress: (progress: number) => void
): Promise<any> {
return new Promise((resolve, reject) => {
this.communication.onMessage = (e: MessageEvent) => {
const data = e.data;
if (data.type === 'progress') {
onProgress(data.value);
} else if (data.type === 'complete') {
resolve(data.result);
} else if (data.type === 'error') {
reject(new Error(data.message));
}
};
this.communication.onError = (e: ErrorEvent) => {
reject(e.error);
};
this.communication.postMessage(task);
});
}
// Terminate worker
terminate(): void {
this.worker.terminate();
}
}
// 7. Batch Processing with Workers
class BatchWorkerProcessor {
private workerManager = new WorkerManager();
// Process batch of items
async processBatch<T, R>(
items: T[],
processorCode: string,
batchSize: number = 10,
onProgress?: (current: number, total: number) => void
): Promise<R[]> {
const results: R[] = [];
const batches: T[][] = [];
// Split into batches
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
// Process each batch
for (let i = 0; i < batches.length; i++) {
const batch = batches[i];
const batchResults = await this.processBatchImpl(batch, processorCode);
results.push(...batchResults);
if (onProgress) {
onProgress(Math.min((i + 1) * batchSize, items.length), items.length);
}
}
return results;
}
private async processBatchImpl<T, R>(batch: T[], processorCode: string): Promise<R[]> {
const workerCode = `
${processorCode}
self.onmessage = function(e) {
const batch = e.data;
const results = batch.map(processItem);
self.postMessage(results);
};
`;
const worker = this.workerManager.createInlineWorker('batch', workerCode);
const comm = new WorkerCommunication(worker);
const results = await comm.sendMessage(batch);
this.workerManager.terminateWorker('batch');
return results;
}
}
// 8. Worker Utilities
class WorkerUtils {
// Check if Web Workers are supported
static isSupported(): boolean {
return typeof Worker !== 'undefined';
}
// Check if Shared Workers are supported
static isSharedWorkerSupported(): boolean {
return typeof SharedWorker !== 'undefined';
}
// Create worker from function
static createWorkerFromFunction(fn: Function): Worker {
const fnString = fn.toString();
const workerCode = `
const workerFunction = ${fnString};
self.onmessage = (e) => {
const result = workerFunction(e.data);
self.postMessage(result);
};
`;
const blob = new Blob([workerCode], { type: 'application/javascript' });
const workerUrl = URL.createObjectURL(blob);
return new Worker(workerUrl);
}
// Execute function in worker
static async executeInWorker<T, R>(fn: (input: T) => R, input: T): Promise<R> {
const worker = this.createWorkerFromFunction(fn);
return new Promise((resolve, reject) => {
worker.onmessage = (e) => {
resolve(e.data as R);
worker.terminate();
};
worker.onerror = (e) => {
reject(e.error);
worker.terminate();
};
worker.postMessage(input);
});
}
}
// Usage Examples
async function demonstrateWebWorkers() {
console.log('=== Web TypeScript Web Workers Examples ===\n');
// Check support
console.log('--- Web Workers Support ---');
console.log(`Web Workers supported: ${WorkerUtils.isSupported()}`);
console.log(`Shared Workers supported: ${WorkerUtils.isSharedWorkerSupported()}`);
// 1. Basic worker
console.log('\n--- 1. Basic Web Worker ---');
const workerManager = new WorkerManager();
const basicWorkerCode = `
self.onmessage = function(e) {
const data = e.data;
console.log('Worker received:', data);
const result = data.value * 2;
self.postMessage({ result: result });
};
`;
const worker = workerManager.createInlineWorker('basic', basicWorkerCode);
const comm = new WorkerCommunication(worker);
comm.onMessage = (e) => {
console.log(`Worker response: ${JSON.stringify(e.data)}`);
};
comm.postMessage({ value: 21 });
// Wait for response
await new Promise(resolve => setTimeout(resolve, 100));
workerManager.terminateWorker('basic');
// 2. Execute computation in worker
console.log('\n--- 2. Execute Computation ---');
const executor = new WorkerTaskExecutor();
const computationResult = await executor.executeComputation({ value: 10 });
console.log(`Computation result: ${JSON.stringify(computationResult)}`);
// 3. Parallel execution
console.log('\n--- 3. Parallel Execution ---');
const parallelWorkerCode = `
self.onmessage = function(e) {
const input = e.data;
const result = input * input; // Square the input
self.postMessage(result);
};
`;
const inputs = [1, 2, 3, 4, 5];
const parallelResults = await executor.executeParallel(inputs, parallelWorkerCode);
console.log(`Parallel results: ${parallelResults.join(', ')}`);
// 4. Worker with progress
console.log('\n--- 4. Worker with Progress ---');
const progressWorkerCode = `
self.onmessage = function(e) {
const task = e.data;
// Simulate progress
for (let i = 0; i <= 100; i += 10) {
self.postMessage({ type: 'progress', value: i });
}
self.postMessage({ type: 'complete', result: task.input * 2 });
};
`;
const progressWorker = new WorkerWithProgress(progressWorkerCode);
const progressResult = await progressWorker.executeWithProgress(
{ input: 15 },
(progress) => console.log(`Progress: ${progress}%`)
);
console.log(`Result with progress: ${progressResult.result}`);
progressWorker.terminate();
// 5. Batch processing
console.log('\n--- 5. Batch Processing ---');
const batchProcessor = new BatchWorkerProcessor();
const processorCode = `
function processItem(item) {
return item * item;
}
`;
const items = Array.from({ length: 25 }, (_, i) => i + 1);
const batchResults = await batchProcessor.processBatch(
items,
processorCode,
5,
(current, total) => console.log(`Batch progress: ${current}/${total}`)
);
console.log(`Batch results (first 10): ${batchResults.slice(0, 10).join(', ')}...`);
// 6. Execute function in worker
console.log('\n--- 6. Execute Function in Worker ---');
const squareResult = await WorkerUtils.executeInWorker(
(x: number) => x * x,
7
);
console.log(`Square result: ${squareResult}`);
// 7. Heavy computation
console.log('\n--- 7. Heavy Computation ---');
const heavyWorkerCode = `
self.onmessage = function(e) {
const n = e.data.n;
let sum = 0;
for (let i = 1; i <= n; i++) {
sum += i;
}
self.postMessage({ sum: sum });
};
`;
const heavyWorker = workerManager.createInlineWorker('heavy', heavyWorkerCode);
const heavyComm = new WorkerCommunication(heavyWorker);
const heavyResult = await heavyComm.sendMessage({ n: 1000000 });
console.log(`Heavy computation result: Sum from 1 to 1000000 = ${heavyResult.sum}`);
workerManager.terminateWorker('heavy');
console.log('\n=== All Web Workers Examples Completed ===');
}
// Export classes
export { WorkerManager, WorkerCommunication, WorkerTaskExecutor, WorkerPool, SharedWorkerManager, WorkerWithProgress, BatchWorkerProcessor, WorkerUtils };
export { demonstrateWebWorkers };
💻 Asynchrone Operationen typescript
🟡 intermediate
⭐⭐⭐
Async/Await-Muster, Promises und gleichzeitige Ausführung in TypeScript meistern
⏱️ 25 min
🏷️ typescript, web, multithreading, async
Prerequisites:
Intermediate TypeScript, Async/Await
// Web TypeScript Async/Await Operations Examples
// Mastering Promises, async/await, and concurrent execution patterns
// 1. Promise Basics
class PromiseBasics {
// Create a resolving promise
static resolve<T>(value: T): Promise<T> {
return Promise.resolve(value);
}
// Create a rejecting promise
static reject<T>(error: T): Promise<never> {
return Promise.reject(error);
}
// Create promise from callback
static fromCallback<T>(
callback: (cb: (error: Error | null, result: T) => void) => void
): Promise<T> {
return new Promise((resolve, reject) => {
callback((error, result) => {
if (error) {
reject(error);
} else {
resolve(result);
}
});
});
}
// Delay execution
static delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Timeout a promise
static withTimeout<T>(promise: Promise<T>, timeoutMs: number, errorMessage: string = 'Timeout'): Promise<T> {
return Promise.race([
promise,
PromiseBasics.delay(timeoutMs).then(() => {
throw new Error(errorMessage);
})
]);
}
// Retry promise
static async retry<T>(
fn: () => Promise<T>,
maxAttempts: number = 3,
delayMs: number = 1000
): Promise<T> {
let lastError: Error;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error as Error;
if (attempt < maxAttempts) {
await this.delay(delayMs * attempt);
}
}
}
throw lastError!;
}
}
// 2. Async Control Flow
class AsyncControlFlow {
// Execute sequentially
static async sequential<T>(tasks: Array<() => Promise<T>>): Promise<T[]> {
const results: T[] = [];
for (const task of tasks) {
const result = await task();
results.push(result);
}
return results;
}
// Execute parallel with limit
static async parallelWithLimit<T>(
tasks: Array<() => Promise<T>>,
limit: number
): Promise<T[]> {
const results: T[] = [];
const executing: Promise<void>[] = [];
for (const task of tasks) {
const promise = task().then(result => {
results.push(result);
});
executing.push(promise);
if (executing.length >= limit) {
await Promise.race(executing);
executing.splice(executing.findIndex(p => p === promise), 1);
}
}
await Promise.all(executing);
return results;
}
// Execute parallel with batching
static async batchParallel<T>(
items: T[],
processor: (item: T) => Promise<any>,
batchSize: number
): Promise<any[]> {
const results: any[] = [];
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchResults = await Promise.all(batch.map(processor));
results.push(...batchResults);
}
return results;
}
// Execute with fallback
static async withFallback<T>(
primary: () => Promise<T>,
fallback: () => Promise<T>
): Promise<T> {
try {
return await primary();
} catch {
return await fallback();
}
}
// Execute with circuit breaker
static withCircuitBreaker<T>(
fn: () => Promise<T>,
options: {
timeout?: number;
maxFailures?: number;
resetTimeout?: number;
} = {}
): () => Promise<T> {
let failures = 0;
let lastFailureTime = 0;
let circuitOpen = false;
const {
timeout = 5000,
maxFailures = 5,
resetTimeout = 60000
} = options;
return async () => {
if (circuitOpen) {
if (Date.now() - lastFailureTime > resetTimeout) {
circuitOpen = false;
failures = 0;
} else {
throw new Error('Circuit breaker is open');
}
}
try {
const result = await PromiseBasics.withTimeout(fn(), timeout);
failures = 0;
return result;
} catch (error) {
failures++;
lastFailureTime = Date.now();
if (failures >= maxFailures) {
circuitOpen = true;
}
throw error;
}
};
}
}
// 3. Async Utilities
class AsyncUtilities {
// Map array asynchronously
static async map<T, R>(
array: T[],
mapper: (item: T, index: number) => Promise<R>
): Promise<R[]> {
const promises = array.map((item, index) => mapper(item, index));
return Promise.all(promises);
}
// Filter array asynchronously
static async filter<T>(
array: T[],
predicate: (item: T, index: number) => Promise<boolean>
): Promise<T[]> {
const results = await Promise.all(
array.map(async (item, index) => ({
item,
keep: await predicate(item, index)
}))
);
return results.filter(result => result.keep).map(result => result.item);
}
// Reduce array asynchronously
static async reduce<T, R>(
array: T[],
reducer: (accumulator: R, item: T, index: number) => Promise<R>,
initialValue: R
): Promise<R> {
let accumulator = initialValue;
for (let i = 0; i < array.length; i++) {
accumulator = await reducer(accumulator, array[i], i);
}
return accumulator;
}
// Find asynchronously
static async find<T>(
array: T[],
predicate: (item: T) => Promise<boolean>
): Promise<T | undefined> {
for (const item of array) {
if (await predicate(item)) {
return item;
}
}
return undefined;
}
// Some asynchronously
static async some<T>(
array: T[],
predicate: (item: T) => Promise<boolean>
): Promise<boolean> {
for (const item of array) {
if (await predicate(item)) {
return true;
}
}
return false;
}
// Every asynchronously
static async every<T>(
array: T[],
predicate: (item: T) => Promise<boolean>
): Promise<boolean> {
for (const item of array) {
if (!(await predicate(item))) {
return false;
}
}
return true;
}
}
// 4. Async Queue
class AsyncQueue<T> {
private queue: T[] = [];
private pendingWaiters: Array<(value: T) => void> = [];
// Enqueue item
enqueue(item: T): void {
if (this.pendingWaiters.length > 0) {
const waiter = this.pendingWaiters.shift()!;
waiter(item);
} else {
this.queue.push(item);
}
}
// Dequeue item
async dequeue(): Promise<T> {
if (this.queue.length > 0) {
return this.queue.shift()!;
}
return new Promise(resolve => {
this.pendingWaiters.push(resolve);
});
}
// Get size
get size(): number {
return this.queue.length;
}
}
// 5. Async Mutex
class AsyncMutex {
private locked = false;
private waiters: Array<() => void> = [];
// Acquire lock
async acquire(): Promise<void> {
if (!this.locked) {
this.locked = true;
return;
}
return new Promise(resolve => {
this.waiters.push(resolve);
});
}
// Release lock
release(): void {
if (this.waiters.length > 0) {
const waiter = this.waiters.shift()!;
waiter();
} else {
this.locked = false;
}
}
// Execute critical section
async execute<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}
// 6. Async Semaphore
class AsyncSemaphore {
private permits: number;
private waiters: Array<() => void> = [];
constructor(permits: number) {
this.permits = permits;
}
// Acquire permit
async acquire(): Promise<void> {
if (this.permits > 0) {
this.permits--;
return;
}
return new Promise(resolve => {
this.waiters.push(resolve);
});
}
// Release permit
release(): void {
if (this.waiters.length > 0) {
const waiter = this.waiters.shift()!;
waiter();
} else {
this.permits++;
}
}
// Execute with semaphore
async execute<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}
// 7. Async Cache
class AsyncCache<K, V> {
private cache = new Map<K, V>();
private pending = new Map<K, Promise<V>>();
// Get or compute value
async getOrCompute(key: K, compute: () => Promise<V>): Promise<V> {
// Check cache
if (this.cache.has(key)) {
return this.cache.get(key)!;
}
// Check pending computation
if (this.pending.has(key)) {
return this.pending.get(key)!;
}
// Compute new value
const promise = compute().then(value => {
this.cache.set(key, value);
this.pending.delete(key);
return value;
});
this.pending.set(key, promise);
return promise;
}
// Get value
get(key: K): V | undefined {
return this.cache.get(key);
}
// Set value
set(key: K, value: V): void {
this.cache.set(key, value);
}
// Clear cache
clear(): void {
this.cache.clear();
this.pending.clear();
}
// Get size
get size(): number {
return this.cache.size;
}
}
// 8. Async Rate Limiter
class AsyncRateLimiter {
private queue: Array<() => void> = [];
private running = 0;
private interval: number;
constructor(concurrency: number, intervalMs: number) {
this.interval = intervalMs;
this.running = concurrency;
// Start ticker
setInterval(() => {
if (this.queue.length > 0 && this.running < concurrency) {
const task = this.queue.shift()!;
task();
this.running--;
}
}, intervalMs);
}
// Execute with rate limiting
async execute<T>(fn: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
const task = async () => {
try {
const result = await fn();
resolve(result);
} catch (error) {
reject(error);
}
};
if (this.running > 0) {
this.running--;
task();
} else {
this.queue.push(task);
}
});
}
}
// Usage Examples
async function demonstrateAsyncOperations() {
console.log('=== Web TypeScript Async/Await Operations Examples ===\n');
// 1. Promise basics
console.log('--- 1. Promise Basics ---');
const resolved = await PromiseBasics.resolve(42);
console.log(`Resolved: ${resolved}`);
try {
await PromiseBasics.reject(new Error('Test error'));
} catch (error) {
console.log(`Rejected: ${(error as Error).message}`);
}
await PromiseBasics.delay(100);
console.log('Delay completed');
// 2. Sequential vs parallel
console.log('\n--- 2. Sequential vs Parallel ---');
const tasks = [
() => PromiseBasics.delay(100).then(() => 1),
() => PromiseBasics.delay(200).then(() => 2),
() => PromiseBasics.delay(150).then(() => 3)
];
const startTime = Date.now();
const sequentialResults = await AsyncControlFlow.sequential(tasks);
console.log(`Sequential: ${sequentialResults.join(', ')} (${Date.now() - startTime}ms)`);
const parallelResults = await Promise.all(tasks.map(t => t()));
console.log(`Parallel: ${parallelResults.join(', ')} (${Date.now() - startTime}ms)`);
// 3. Parallel with limit
console.log('\n--- 3. Parallel with Limit ---');
const limitStartTime = Date.now();
const limitedResults = await AsyncControlFlow.parallelWithLimit(tasks, 2);
console.log(`Limited (2): ${limitedResults.join(', ')} (${Date.now() - limitStartTime}ms)`);
// 4. Async map
console.log('\n--- 4. Async Map ---');
const numbers = [1, 2, 3, 4, 5];
const doubled = await AsyncUtilities.map(numbers, async (n) => {
await PromiseBasics.delay(50);
return n * 2;
});
console.log(`Doubled: ${doubled.join(', ')}`);
// 5. Async filter
console.log('\n--- 5. Async Filter ---');
const evens = await AsyncUtilities.filter(numbers, async (n) => {
await PromiseBasics.delay(10);
return n % 2 === 0;
});
console.log(`Even numbers: ${evens.join(', ')}`);
// 6. Async queue
console.log('\n--- 6. Async Queue ---');
const queue = new AsyncQueue<number>();
queue.enqueue(1);
queue.enqueue(2);
queue.enqueue(3);
console.log(`Queue size: ${queue.size}`);
console.log(`Dequeue: ${await queue.dequeue()}`);
console.log(`Dequeue: ${await queue.dequeue()}`);
// 7. Async mutex
console.log('\n--- 7. Async Mutex ---');
const mutex = new AsyncMutex();
const mutexTask = mutex.execute(async () => {
console.log('Critical section start');
await PromiseBasics.delay(100);
console.log('Critical section end');
return 'done';
});
console.log(`Mutex result: ${await mutexTask}`);
// 8. Async cache
console.log('\n--- 8. Async Cache ---');
const cache = new AsyncCache<string, string>();
const computeResult1 = await cache.getOrCompute('key1', async () => {
console.log('Computing value for key1');
await PromiseBasics.delay(100);
return 'value1';
});
const computeResult2 = await cache.getOrCompute('key1', async () => {
console.log('This should not print');
return 'value1-different';
});
console.log(`Cache results: ${computeResult1}, ${computeResult2}`);
console.log(`Cache size: ${cache.size}`);
// 9. Retry
console.log('\n--- 9. Retry ---');
let attempts = 0;
const retryResult = await PromiseBasics.retry(async () => {
attempts++;
console.log(`Attempt ${attempts}`);
if (attempts < 3) {
throw new Error('Not yet');
}
return 'success';
}, 5, 50);
console.log(`Retry result: ${retryResult}`);
// 10. Timeout
console.log('\n--- 10. Timeout ---');
try {
await PromiseBasics.withTimeout(
PromiseBasics.delay(2000).then(() => 'slow'),
100,
'Too slow'
);
} catch (error) {
console.log(`Timeout error: ${(error as Error).message}`);
}
console.log('\n=== All Async/Await Examples Completed ===');
}
// Export classes
export { PromiseBasics, AsyncControlFlow, AsyncUtilities, AsyncQueue, AsyncMutex, AsyncSemaphore, AsyncCache, AsyncRateLimiter };
export { demonstrateAsyncOperations };
💻 Synchronisierungsprimitive typescript
🔴 complex
⭐⭐⭐⭐
Synchronisation mit Atomics, SharedArrayBuffer und Sperren implementieren
⏱️ 40 min
🏷️ typescript, web, multithreading, synchronization
Prerequisites:
Advanced TypeScript, Atomics API, SharedArrayBuffer
// Web TypeScript Synchronization Primitives Examples
// Using Atomics, SharedArrayBuffer, and custom synchronization
// 1. Atomics Operations
class AtomicsOperations {
// Create shared buffer
static createSharedBuffer(size: number): Int32Array {
const buffer = new SharedArrayBuffer(size * Int32Array.BYTES_PER_ELEMENT);
return new Int32Array(buffer);
}
// Compare and swap
static compareAndSwap(
array: Int32Array,
index: number,
expectedValue: number,
newValue: number
): boolean {
return Atomics.compareExchange(array, index, expectedValue, newValue) === expectedValue;
}
// Add atomically
static add(array: Int32Array, index: number, value: number): number {
return Atomics.add(array, index, value);
}
// Sub atomically
static sub(array: Int32Array, index: number, value: number): number {
return Atomics.sub(array, index, value);
}
// Increment atomically
static increment(array: Int32Array, index: number): number {
return Atomics.add(array, index, 1);
}
// Decrement atomically
static decrement(array: Int32Array, index: number): number {
return Atomics.sub(array, index, 1);
}
// Load atomically
static load(array: Int32Array, index: number): number {
return Atomics.load(array, index);
}
// Store atomically
static store(array: Int32Array, index: number, value: number): void {
Atomics.store(array, index, value);
}
// Wait for notification
static waitAsync(array: Int32Array, index: number, expectedValue: number, timeout?: number): Promise<'ok' | 'not-equal' | 'timed-out'> {
return Atomics.waitAsync(array, index, expectedValue, timeout).then(() => 'ok');
}
// Notify waiters
static notify(array: Int32Array, index: number, count: number): number {
return Atomics.notify(array, index, count);
}
}
// 2. Spinlock
class Spinlock {
private locked: Int32Array;
constructor() {
this.locked = AtomicsOperations.createSharedBuffer(1);
}
// Acquire lock (busy-wait)
acquire(): void {
while (true) {
// Try to acquire lock
const previous = AtomicsOperations.exchange(this.locked, 0, 1);
if (previous === 0) {
// Lock acquired
return;
}
// Wait for notification
Atomics.wait(this.locked, 0, 1);
}
}
// Release lock
release(): void {
AtomicsOperations.store(this.locked, 0, 0);
AtomicsOperations.notify(this.locked, 0, 1);
}
}
// 3. Mutex (Better than Spinlock)
class Mutex {
private locked: Int32Array;
constructor() {
this.locked = AtomicsOperations.createSharedBuffer(1);
}
// Acquire lock with timeout
async acquire(timeoutMs: number = 5000): Promise<boolean> {
const start = Date.now();
while (true) {
// Try to acquire lock
const previous = AtomicsOperations.compareAndSwap(this.locked, 0, 0, 1);
if (previous === 0) {
return true;
}
// Check timeout
if (Date.now() - start > timeoutMs) {
return false;
}
// Wait with timeout
await AtomicsOperations.waitAsync(this.locked, 0, 1, 100);
}
}
// Release lock
release(): void {
AtomicsOperations.store(this.locked, 0, 0);
AtomicsOperations.notify(this.locked, 0, 1);
}
// Execute critical section
async execute<T>(fn: () => Promise<T>, timeoutMs?: number): Promise<T> {
const acquired = await this.acquire(timeoutMs);
if (!acquired) {
throw new Error('Mutex acquisition timeout');
}
try {
return await fn();
} finally {
this.release();
}
}
}
// 4. Semaphore
class Semaphore {
private permits: Int32Array;
constructor(initialPermits: number) {
this.permits = AtomicsOperations.createSharedBuffer(1);
AtomicsOperations.store(this.permits, 0, initialPermits);
}
// Acquire permit
async acquire(timeoutMs: number = 5000): Promise<boolean> {
const start = Date.now();
while (true) {
const current = AtomicsOperations.load(this.permits, 0);
if (current > 0) {
// Try to acquire permit
const previous = AtomicsOperations.compareAndSwap(
this.permits,
0,
current,
current - 1
);
if (previous === current) {
return true;
}
}
// Check timeout
if (Date.now() - start > timeoutMs) {
return false;
}
// Wait
await AtomicsOperations.waitAsync(this.permits, 0, current, 100);
}
}
// Release permit
release(): void {
AtomicsOperations.increment(this.permits, 0);
AtomicsOperations.notify(this.permits, 0, 1);
}
// Execute with semaphore
async execute<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}
// 5. Barrier
class Barrier {
private count: Int32Array;
private waiting: Int32Array;
private total: number;
constructor(parties: number) {
this.total = parties;
this.count = AtomicsOperations.createSharedBuffer(1);
this.waiting = AtomicsOperations.createSharedBuffer(1);
}
// Wait for all parties
async wait(): Promise<void> {
const index = AtomicsOperations.add(this.waiting, 0, 1);
if (index === this.total) {
// Last party resets the barrier
AtomicsOperations.store(this.waiting, 0, 0);
AtomicsOperations.store(this.count, 0, 0);
AtomicsOperations.notify(this.count, 0, this.total);
} else {
// Wait for last party
await AtomicsOperations.waitAsync(this.count, 0, 0);
}
}
// Reset barrier
reset(): void {
AtomicsOperations.store(this.waiting, 0, 0);
AtomicsOperations.store(this.count, 0, 0);
}
}
// 6. Read-Write Lock
class ReadWriteLock {
private readers: Int32Array;
private writer: Int32Array;
constructor() {
this.readers = AtomicsOperations.createSharedBuffer(1);
this.writer = AtomicsOperations.createSharedBuffer(1);
}
// Acquire read lock
async acquireReadLock(): Promise<void> {
while (true) {
// Wait if writer is active
while (AtomicsOperations.load(this.writer, 0) !== 0) {
await AtomicsOperations.waitAsync(this.writer, 0, 0);
}
// Increment readers
AtomicsOperations.increment(this.readers, 0);
// Check if writer is still not active
if (AtomicsOperations.load(this.writer, 0) === 0) {
return;
}
// Writer became active, back off
AtomicsOperations.decrement(this.readers, 0);
}
}
// Release read lock
releaseReadLock(): void {
AtomicsOperations.decrement(this.readers, 0);
// Notify potential writer if no readers left
if (AtomicsOperations.load(this.readers, 0) === 0) {
AtomicsOperations.notify(this.writer, 0, 1);
}
}
// Acquire write lock
async acquireWriteLock(): Promise<void> {
// Mark writer as waiting
while (true) {
// Wait for readers to finish
while (AtomicsOperations.load(this.readers, 0) !== 0) {
await AtomicsOperations.waitAsync(this.readers, 0, 1);
}
// Try to acquire write lock
const previous = AtomicsOperations.compareAndSwap(this.writer, 0, 0, 1);
if (previous === 0) {
return;
}
}
}
// Release write lock
releaseWriteLock(): void {
AtomicsOperations.store(this.writer, 0, 0);
AtomicsOperations.notify(this.writer, 0, 1);
AtomicsOperations.notify(this.readers, 0, 1);
}
// Execute with read lock
async executeRead<T>(fn: () => Promise<T>): Promise<T> {
await this.acquireReadLock();
try {
return await fn();
} finally {
this.releaseReadLock();
}
}
// Execute with write lock
async executeWrite<T>(fn: () => Promise<T>): Promise<T> {
await this.acquireWriteLock();
try {
return await fn();
} finally {
this.releaseWriteLock();
}
}
}
// 7. Concurrent Counter
class ConcurrentCounter {
private counter: Int32Array;
constructor(initialValue: number = 0) {
this.counter = AtomicsOperations.createSharedBuffer(1);
AtomicsOperations.store(this.counter, 0, initialValue);
}
// Increment and get
incrementAndGet(): number {
return AtomicsOperations.add(this.counter, 0, 1) + 1;
}
// Get and increment
getAndIncrement(): number {
return AtomicsOperations.add(this.counter, 0, 1);
}
// Decrement and get
decrementAndGet(): number {
return AtomicsOperations.sub(this.counter, 0, 1) - 1;
}
// Get and decrement
getAndDecrement(): number {
return AtomicsOperations.sub(this.counter, 0, 1);
}
// Get current value
get(): number {
return AtomicsOperations.load(this.counter, 0);
}
// Set value
set(value: number): void {
AtomicsOperations.store(this.counter, 0, value);
}
// Add and get
addAndGet(value: number): number {
return AtomicsOperations.add(this.counter, 0, value) + value;
}
}
// 8. Producer-Consumer Queue
class ProducerConsumerQueue<T> {
private buffer: T[] = [];
private size: Int32Array;
private count: Int32Array;
private head: Int32Array;
private tail: Int32Array;
constructor(capacity: number) {
this.buffer = new Array(capacity);
this.size = AtomicsOperations.createSharedBuffer(1);
this.count = AtomicsOperations.createSharedBuffer(1);
this.head = AtomicsOperations.createSharedBuffer(1);
this.tail = AtomicsOperations.createSharedBuffer(1);
AtomicsOperations.store(this.size, 0, capacity);
}
// Enqueue item
async enqueue(item: T): Promise<void> {
while (true) {
const currentCount = AtomicsOperations.load(this.count, 0);
const capacity = AtomicsOperations.load(this.size, 0);
if (currentCount < capacity) {
// Not full, enqueue
const tail = AtomicsOperations.load(this.tail, 0);
this.buffer[tail] = item;
AtomicsOperations.store(this.tail, 0, (tail + 1) % capacity);
AtomicsOperations.increment(this.count, 0);
AtomicsOperations.notify(this.count, 0, 1);
return;
}
// Full, wait for space
await AtomicsOperations.waitAsync(this.count, 0, currentCount);
}
}
// Dequeue item
async dequeue(): Promise<T> {
while (true) {
const currentCount = AtomicsOperations.load(this.count, 0);
if (currentCount > 0) {
// Not empty, dequeue
const head = AtomicsOperations.load(this.head, 0);
const capacity = AtomicsOperations.load(this.size, 0);
const item = this.buffer[head];
this.buffer[head] = undefined as any;
AtomicsOperations.store(this.head, 0, (head + 1) % capacity);
AtomicsOperations.decrement(this.count, 0);
AtomicsOperations.notify(this.count, 0, 1);
return item;
}
// Empty, wait for item
await AtomicsOperations.waitAsync(this.count, 0, currentCount);
}
}
// Get current count
get count(): number {
return AtomicsOperations.load(this.count, 0);
}
}
// Usage Examples
async function demonstrateSynchronization() {
console.log('=== Web TypeScript Synchronization Primitives Examples ===\n');
// 1. Atomics operations
console.log('--- 1. Atomics Operations ---');
const buffer = AtomicsOperations.createSharedBuffer(5);
for (let i = 0; i < buffer.length; i++) {
AtomicsOperations.store(buffer, i, i * 10);
}
console.log('Buffer values:');
for (let i = 0; i < buffer.length; i++) {
console.log(` [${i}] = ${buffer[i]}`);
}
AtomicsOperations.add(buffer, 0, 5);
console.log(`After add to [0]: ${buffer[0]}`);
// 2. Concurrent counter
console.log('\n--- 2. Concurrent Counter ---');
const counter = new ConcurrentCounter(0);
// Simulate concurrent increments
const incrementTasks = Array.from({ length: 10 }, (_, i) =>
PromiseBasics.delay(Math.random() * 100).then(() => {
counter.incrementAndGet();
})
);
await Promise.all(incrementTasks);
console.log(`Final counter value: ${counter.get()}`);
// 3. Mutex
console.log('\n--- 3. Mutex ---');
const mutex = new Mutex();
let sharedResource = 0;
const mutexTasks = Array.from({ length: 5 }, (_, i) =>
mutex.execute(async () => {
const value = sharedResource;
console.log(`Task ${i} read: ${value}`);
await PromiseBasics.delay(50);
sharedResource = value + 1;
console.log(`Task ${i} wrote: ${sharedResource}`);
return sharedResource;
})
);
await Promise.all(mutexTasks);
console.log(`Final shared resource value: ${sharedResource}`);
// 4. Semaphore
console.log('\n--- 4. Semaphore ---');
const semaphore = new Semaphore(2); // Allow 2 concurrent operations
let activeCount = 0;
let maxActive = 0;
const semaphoreTasks = Array.from({ length: 10 }, (_, i) =>
semaphore.execute(async () => {
activeCount++;
maxActive = Math.max(maxActive, activeCount);
console.log(`Task ${i} started (active: ${activeCount})`);
await PromiseBasics.delay(Math.random() * 200);
activeCount--;
console.log(`Task ${i} completed`);
})
);
await Promise.all(semaphoreTasks);
console.log(`Max concurrent tasks: ${maxActive}`);
// 5. Barrier
console.log('\n--- 5. Barrier ---');
const barrier = new Barrier(3);
const barrierTasks = Array.from({ length: 3 }, (_, i) =>
barrier.wait().then(() => {
console.log(`Task ${i} passed barrier`);
})
);
await Promise.all(barrierTasks);
// 6. Producer-Consumer
console.log('\n--- 6. Producer-Consumer Queue ---');
const queue = new ProducerConsumerQueue<number>(5);
// Producer
queue.enqueue(1);
setTimeout(() => queue.enqueue(2), 100);
setTimeout(() => queue.enqueue(3), 200);
// Consumer
const item1 = await queue.dequeue();
console.log(`Dequeued: ${item1}`);
const item2 = await queue.dequeue();
console.log(`Dequeued: ${item2}`);
console.log(`Queue count: ${queue.count}`);
console.log('\n=== All Synchronization Primitives Examples Completed ===');
}
// Import PromiseBasics
class PromiseBasics {
static delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Export classes
export { AtomicsOperations, Spinlock, Mutex, Semaphore, Barrier, ReadWriteLock, ConcurrentCounter, ProducerConsumerQueue };
export { demonstrateSynchronization };