Yieldless
Reference

yieldless/queue

Bounded async queues with abortable offer, take, close, and async iteration.

yieldless/queue connects producers and consumers without a framework runtime. It is useful when work arrives faster than it can be processed, or when a worker loop should consume values as they appear.

The queue supports bounded capacity, producer backpressure, abortable waits, explicit close, draining, and for await consumption.

Exports

  • createQueue<T, E = Error>({ capacity }): AsyncQueue<T, E>
  • class QueueClosedError extends Error
  • type AsyncQueue<T, E> = { offer, take, close, drain, clear, size, capacity, closed, pendingOffers, pendingTakes } & AsyncIterable<T>
  • type QueueOperationOptions = { signal?: AbortSignal }

Example

import { createQueue } from "yieldless/queue";

const queue = createQueue<string>({ capacity: 100 });

async function worker(signal: AbortSignal) {
  for await (const jobId of queue) {
    if (signal.aborted) return;
    await processJob(jobId, signal);
  }
}

await queue.offer("job-1", { signal });

Behavior notes

  • capacity defaults to Infinity.
  • A full bounded queue makes offer() wait until a consumer takes a value.
  • A waiting take() receives the next offered value immediately.
  • offer() and take() return tuple results instead of throwing on abort or close.
  • close(reason) resolves pending and future operations with [reason, null].
  • Async iteration ends when the queue closes.
  • drain() removes buffered values and then gives pending offers a chance to enter the queue.

Good

Use a bounded queue to make backpressure visible.

const thumbnails = createQueue<ImageJob>({ capacity: 50 });

await thumbnails.offer(job, { signal });

Consume with ordinary async iteration.

for await (const job of thumbnails) {
  await renderThumbnail(job, signal);
}

Avoid

Do not use an unbounded queue for input you do not control.

const queue = createQueue<IncomingRequest>();

Prefer a capacity that matches the worker pool or memory budget.

const queue = createQueue<IncomingRequest>({ capacity: 200 });

On this page