Yieldless
Recipes

Repository Indexer

A larger example that combines queues, pub/sub, caching, batching, limits, retries, task groups, and Node subprocesses.

This recipe shows a realistic in-process worker pipeline: a user selects repositories, the app indexes them, progress is streamed to the UI, remote metadata is cached, owner lookups are batched, and local Git subprocesses stay bounded.

The point is not to build a new runtime. The point is to keep the moving parts visible:

  • yieldless/queue accepts work with backpressure
  • yieldless/pubsub broadcasts progress
  • yieldless/task runs workers under one cancellation signal
  • yieldless/limiter protects API quota and local subprocess capacity
  • yieldless/cache avoids repeated metadata reads
  • yieldless/batcher collapses nearby owner lookups
  • yieldless/retry handles transient remote failures
  • yieldless/fetch keeps remote calls in tuple form
  • yieldless/node wraps local Git commands as tuples

The indexer

import { createBatcher } from "yieldless/batcher";
import { createCache } from "yieldless/cache";
import { HttpStatusError, fetchJsonSafe } from "yieldless/fetch";
import { createPubSub } from "yieldless/pubsub";
import { createQueue } from "yieldless/queue";
import {
  createRateLimiter,
  createSemaphore,
  withPermit,
} from "yieldless/limiter";
import { runCommandSafe } from "yieldless/node";
import { safeRetry } from "yieldless/retry";
import { runTaskGroup } from "yieldless/task";

interface RepositoryMetadata {
  readonly id: string;
  readonly ownerId: string;
  readonly path: string;
}

interface Owner {
  readonly id: string;
  readonly name: string;
}

interface IndexJob {
  readonly repositoryId: string;
}

type IndexEvent =
  | { readonly type: "queued"; readonly repositoryId: string }
  | { readonly type: "started"; readonly repositoryId: string }
  | {
      readonly type: "indexed";
      readonly repositoryId: string;
      readonly ownerName: string;
      readonly dirty: boolean;
    }
  | {
      readonly type: "failed";
      readonly repositoryId: string;
      readonly message: string;
    };

interface RepositoryIndexerOptions {
  readonly apiBaseUrl: string;
  readonly workerCount?: number;
}

function messageFrom(error: unknown): string {
  return error instanceof Error ? error.message : String(error);
}

export function createRepositoryIndexer(options: RepositoryIndexerOptions) {
  const workerCount = options.workerCount ?? 4;
  const queue = createQueue<IndexJob>({ capacity: workerCount * 25 });
  const events = createPubSub<IndexEvent>({ replay: 25 });
  const apiQuota = createRateLimiter({ limit: 60, intervalMs: 60_000 });
  const gitSlots = createSemaphore(workerCount);

  async function apiGet<T>(path: string, signal: AbortSignal) {
    const [quotaError] = await apiQuota.takeSafe({ signal });

    if (quotaError) {
      return [quotaError, null] as const;
    }

    return await fetchJsonSafe<T>(`${options.apiBaseUrl}${path}`, {
      timeoutMs: 3_000,
      signal,
    });
  }

  const metadata = createCache<string, RepositoryMetadata>({
    ttlMs: 60_000,
    maxSize: 2_000,
    load: (repositoryId, signal) =>
      safeRetry(
        (_attempt, attemptSignal) =>
          apiGet<RepositoryMetadata>(
            `/repositories/${repositoryId}`,
            attemptSignal,
          ),
        {
          maxAttempts: 3,
          baseDelayMs: 150,
          shouldRetry: (error) =>
            !(error instanceof HttpStatusError) || error.status >= 500,
          signal,
        },
      ),
  });

  const owners = createBatcher<string, Owner>({
    waitMs: 2,
    maxBatchSize: 50,
    loadMany: async (ownerIds, signal) => {
      const ids = encodeURIComponent(ownerIds.join(","));
      const [error, values] = await apiGet<Owner[]>(`/owners?ids=${ids}`, signal);

      if (error) {
        return [error, null] as const;
      }

      const byId = new Map(values.map((owner) => [owner.id, owner]));

      return [
        null,
        ownerIds.map((id) => byId.get(id) ?? { id, name: "Unknown owner" }),
      ] as const;
    },
  });

  async function inspectGitStatus(
    repository: RepositoryMetadata,
    signal: AbortSignal,
  ) {
    return await withPermit(
      gitSlots,
      (scopedSignal) =>
        runCommandSafe("git", ["status", "--short"], {
          cwd: repository.path,
          signal: scopedSignal,
        }),
      { signal },
    );
  }

  async function processJob(job: IndexJob, signal: AbortSignal) {
    events.publish({
      type: "started",
      repositoryId: job.repositoryId,
    });

    const [metadataError, repository] = await metadata.get(job.repositoryId, {
      signal,
    });

    if (metadataError) {
      events.publish({
        type: "failed",
        repositoryId: job.repositoryId,
        message: messageFrom(metadataError),
      });
      return;
    }

    const [ownerError, owner] = await owners.load(repository.ownerId, {
      signal,
    });

    if (ownerError) {
      events.publish({
        type: "failed",
        repositoryId: job.repositoryId,
        message: messageFrom(ownerError),
      });
      return;
    }

    const [statusError, status] = await inspectGitStatus(repository, signal);

    if (statusError) {
      events.publish({
        type: "failed",
        repositoryId: job.repositoryId,
        message: messageFrom(statusError),
      });
      return;
    }

    events.publish({
      type: "indexed",
      repositoryId: repository.id,
      ownerName: owner.name,
      dirty: status.stdout.trim().length > 0,
    });
  }

  async function runWorker(signal: AbortSignal) {
    while (!signal.aborted) {
      const [takeError, job] = await queue.take({ signal });

      if (takeError) {
        return;
      }

      await processJob(job, signal);
    }
  }

  return {
    events,
    close(): void {
      queue.close();
    },
    async enqueue(repositoryId: string, signal?: AbortSignal) {
      const result = await queue.offer({ repositoryId }, { signal });

      if (result[0] === null) {
        events.publish({ type: "queued", repositoryId });
      }

      return result;
    },
    async run(signal?: AbortSignal): Promise<void> {
      try {
        await runTaskGroup(
          async (group) => {
            const workers = Array.from({ length: workerCount }, () =>
              group.spawn((workerSignal) => runWorker(workerSignal)),
            );

            await Promise.all(workers);
          },
          { signal },
        );
      } finally {
        events.close();
      }
    },
  };
}

Using it from a route or action

const indexer = createRepositoryIndexer({
  apiBaseUrl: "https://api.example.com",
  workerCount: 4,
});

const progress = indexer.events.subscribe();
const controller = new AbortController();
const running = indexer.run(controller.signal);

for (const repositoryId of selectedRepositoryIds) {
  const [enqueueError] = await indexer.enqueue(
    repositoryId,
    controller.signal,
  );

  if (enqueueError) {
    controller.abort(enqueueError);
    break;
  }
}

indexer.close();

for await (const event of progress) {
  renderIndexProgress(event);
}

await running;

In a real UI, the progress subscription often lives in a websocket, server-sent events stream, or Electron IPC handler. The shape stays the same: subscribe, render events, and abort when the user leaves.

Why this is easier to operate

  • The queue is bounded, so user-selected work cannot grow memory forever.
  • Worker count is explicit, so local Git subprocesses stay polite.
  • API quota waits happen before remote calls, not after the upstream complains.
  • Metadata reads are cached, but failed reads are not stored.
  • Owner lookups batch together while still returning one result per job.
  • Retry policy sits around the flaky remote read, not around the whole pipeline.
  • Progress is just an async subscription, so it can feed logs, UI, or tests.

Smaller variations

Process a fixed list without a long-running queue

If you already have the whole list and do not need progress fanout, mapLimit() is smaller.

import { mapLimit } from "yieldless/all";

const [error, summaries] = await mapLimit(
  selectedRepositoryIds,
  (repositoryId, _index, signal) => indexOneRepository(repositoryId, signal),
  {
    concurrency: 4,
    signal,
  },
);

Keep only the cache and batcher

If the pipeline is overkill, keep the read model helpers.

const repository = await metadata.get(repositoryId, { signal });
const owner = await owners.load(ownerId, { signal });

Add a circuit breaker to an optional dependency

If indexing can continue without feature flags or recommendations, wrap that optional remote call with a circuit breaker and use a fallback when it is open.

import { CircuitOpenError, createCircuitBreaker } from "yieldless/breaker";
import { fetchJsonSafe } from "yieldless/fetch";

const loadRecommendations = createCircuitBreaker(
  (signal, repositoryId: string) =>
    fetchJsonSafe<Recommendation[]>(`/recommendations/${repositoryId}`, {
      timeoutMs: 2_000,
      signal,
    }),
  {
    failureThreshold: 3,
    cooldownMs: 15_000,
  },
);

const [recommendationError, recommendations] =
  await loadRecommendations(repositoryId);

if (recommendationError instanceof CircuitOpenError) {
  return [null, []] as const;
}

if (recommendationError) {
  return [recommendationError, null] as const;
}

return [null, recommendations] as const;

Avoid: hiding the pipeline inside a generic worker framework

const indexer = makeRuntime({
  retry: true,
  cache: true,
  batch: true,
  queue: true,
  workers: 4,
});

That looks shorter, but the operational choices disappear. Yieldless works best when the important limits remain close to the work they protect.

On this page