Yieldless
Reference

yieldless/iterable

Tuple helpers for sync and async iterable workflows.

yieldless/iterable handles streams of ordinary JavaScript values without introducing a stream runtime. It works with both Iterable and AsyncIterable, captures thrown iterator failures as tuples, and forwards AbortSignal through workers.

Exports

  • collect(iterable, options): Promise<SafeResult<T[], E>>
  • forEach(iterable, worker, options): Promise<SafeResult<void, E>>
  • mapAsyncLimit(iterable, mapper, options): Promise<SafeResult<Value[], E>>
  • type AnyIterable<T> = Iterable<T> | AsyncIterable<T>
  • type IterableWorker<Item, E = Error> = (item, index, signal) => SafeResult<void, E> | PromiseLike<SafeResult<void, E>>
  • type IterableMapper<Item, Value, E = Error> = (item, index, signal) => SafeResult<Value, E> | PromiseLike<SafeResult<Value, E>>

Collect

import { collect } from "yieldless/iterable";

const [error, lines] = await collect(readLines(filePath), { signal });

Sequential work

import { forEach } from "yieldless/iterable";

const [error] = await forEach(
  readRows(filePath),
  async (row, _index, signal) => writeRow(row, signal),
  { signal },
);

Accumulating results with forEach

forEach() is useful when each item performs side effects and you want to decide exactly what gets accumulated.

import { fetchJsonSafe } from "yieldless/fetch";
import { forEach } from "yieldless/iterable";

const users: User[] = [];

const [error] = await forEach(
  ids,
  async (id, _index, signal) => {
    const [fetchError, user] = await fetchJsonSafe<User>(
      `https://api.example.com/users/${String(id)}`,
      {
        timeoutMs: 5_000,
        signal,
      },
    );

    if (fetchError) {
      return [fetchError, null] as const;
    }

    users.push(user);
    return [null, undefined] as const;
  },
  { signal },
);

if (error) {
  return [error, null] as const;
}

return [null, users] as const;

Use Read IDs and Fetch Records for the full version that reads and parses IDs from a file first.

Bounded mapping

import { mapAsyncLimit } from "yieldless/iterable";

const [error, thumbnails] = await mapAsyncLimit(
  readImages(source),
  (image, _index, signal) => renderThumbnail(image, signal),
  {
    concurrency: 4,
    signal,
  },
);

mapAsyncLimit() preserves input order in the returned array while keeping only the configured number of mappers active.

Behavior notes

  • Iterator throws are captured as tuple errors.
  • Worker and mapper throws are captured as tuple errors.
  • The first tuple error aborts in-flight bounded mapping work.
  • forEach() is sequential by design; use mapAsyncLimit() for parallelism.
  • concurrency must be a positive integer.

Good

Use forEach() when order and backpressure matter more than throughput.

const [error] = await forEach(
  readLogLines(filePath),
  async (line, index, signal) => writeLine(index, line, signal),
  { signal },
);

Use mapAsyncLimit() for many independent operations.

const [error, results] = await mapAsyncLimit(
  readRepositories(workspace),
  (repo, _index, signal) => inspectRepository(repo, signal),
  { concurrency: 4, signal },
);

Avoid

Do not materialize a huge async iterable just so you can use array helpers.

const rows = [];
for await (const row of readRows(file)) {
  rows.push(row);
}

await Promise.all(rows.map(processRow));

Use forEach() or mapAsyncLimit() so the iterable can stream and cancellation can stop the work early.

On this page