Repository Indexer
A larger example that combines queues, pub/sub, caching, batching, limits, retries, task groups, and Node subprocesses.
This recipe shows a realistic in-process worker pipeline: a user selects repositories, the app indexes them, progress is streamed to the UI, remote metadata is cached, owner lookups are batched, and local Git subprocesses stay bounded.
The point is not to build a new runtime. The point is to keep the moving parts visible:
yieldless/queueaccepts work with backpressureyieldless/pubsubbroadcasts progressyieldless/taskruns workers under one cancellation signalyieldless/limiterprotects API quota and local subprocess capacityyieldless/cacheavoids repeated metadata readsyieldless/batchercollapses nearby owner lookupsyieldless/retryhandles transient remote failuresyieldless/fetchkeeps remote calls in tuple formyieldless/nodewraps local Git commands as tuples
The indexer
import { createBatcher } from "yieldless/batcher";
import { createCache } from "yieldless/cache";
import { HttpStatusError, fetchJsonSafe } from "yieldless/fetch";
import { createPubSub } from "yieldless/pubsub";
import { createQueue } from "yieldless/queue";
import {
createRateLimiter,
createSemaphore,
withPermit,
} from "yieldless/limiter";
import { runCommandSafe } from "yieldless/node";
import { safeRetry } from "yieldless/retry";
import { runTaskGroup } from "yieldless/task";
interface RepositoryMetadata {
readonly id: string;
readonly ownerId: string;
readonly path: string;
}
interface Owner {
readonly id: string;
readonly name: string;
}
interface IndexJob {
readonly repositoryId: string;
}
type IndexEvent =
| { readonly type: "queued"; readonly repositoryId: string }
| { readonly type: "started"; readonly repositoryId: string }
| {
readonly type: "indexed";
readonly repositoryId: string;
readonly ownerName: string;
readonly dirty: boolean;
}
| {
readonly type: "failed";
readonly repositoryId: string;
readonly message: string;
};
interface RepositoryIndexerOptions {
readonly apiBaseUrl: string;
readonly workerCount?: number;
}
function messageFrom(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
export function createRepositoryIndexer(options: RepositoryIndexerOptions) {
const workerCount = options.workerCount ?? 4;
const queue = createQueue<IndexJob>({ capacity: workerCount * 25 });
const events = createPubSub<IndexEvent>({ replay: 25 });
const apiQuota = createRateLimiter({ limit: 60, intervalMs: 60_000 });
const gitSlots = createSemaphore(workerCount);
async function apiGet<T>(path: string, signal: AbortSignal) {
const [quotaError] = await apiQuota.takeSafe({ signal });
if (quotaError) {
return [quotaError, null] as const;
}
return await fetchJsonSafe<T>(`${options.apiBaseUrl}${path}`, {
timeoutMs: 3_000,
signal,
});
}
const metadata = createCache<string, RepositoryMetadata>({
ttlMs: 60_000,
maxSize: 2_000,
load: (repositoryId, signal) =>
safeRetry(
(_attempt, attemptSignal) =>
apiGet<RepositoryMetadata>(
`/repositories/${repositoryId}`,
attemptSignal,
),
{
maxAttempts: 3,
baseDelayMs: 150,
shouldRetry: (error) =>
!(error instanceof HttpStatusError) || error.status >= 500,
signal,
},
),
});
const owners = createBatcher<string, Owner>({
waitMs: 2,
maxBatchSize: 50,
loadMany: async (ownerIds, signal) => {
const ids = encodeURIComponent(ownerIds.join(","));
const [error, values] = await apiGet<Owner[]>(`/owners?ids=${ids}`, signal);
if (error) {
return [error, null] as const;
}
const byId = new Map(values.map((owner) => [owner.id, owner]));
return [
null,
ownerIds.map((id) => byId.get(id) ?? { id, name: "Unknown owner" }),
] as const;
},
});
async function inspectGitStatus(
repository: RepositoryMetadata,
signal: AbortSignal,
) {
return await withPermit(
gitSlots,
(scopedSignal) =>
runCommandSafe("git", ["status", "--short"], {
cwd: repository.path,
signal: scopedSignal,
}),
{ signal },
);
}
async function processJob(job: IndexJob, signal: AbortSignal) {
events.publish({
type: "started",
repositoryId: job.repositoryId,
});
const [metadataError, repository] = await metadata.get(job.repositoryId, {
signal,
});
if (metadataError) {
events.publish({
type: "failed",
repositoryId: job.repositoryId,
message: messageFrom(metadataError),
});
return;
}
const [ownerError, owner] = await owners.load(repository.ownerId, {
signal,
});
if (ownerError) {
events.publish({
type: "failed",
repositoryId: job.repositoryId,
message: messageFrom(ownerError),
});
return;
}
const [statusError, status] = await inspectGitStatus(repository, signal);
if (statusError) {
events.publish({
type: "failed",
repositoryId: job.repositoryId,
message: messageFrom(statusError),
});
return;
}
events.publish({
type: "indexed",
repositoryId: repository.id,
ownerName: owner.name,
dirty: status.stdout.trim().length > 0,
});
}
async function runWorker(signal: AbortSignal) {
while (!signal.aborted) {
const [takeError, job] = await queue.take({ signal });
if (takeError) {
return;
}
await processJob(job, signal);
}
}
return {
events,
close(): void {
queue.close();
},
async enqueue(repositoryId: string, signal?: AbortSignal) {
const result = await queue.offer({ repositoryId }, { signal });
if (result[0] === null) {
events.publish({ type: "queued", repositoryId });
}
return result;
},
async run(signal?: AbortSignal): Promise<void> {
try {
await runTaskGroup(
async (group) => {
const workers = Array.from({ length: workerCount }, () =>
group.spawn((workerSignal) => runWorker(workerSignal)),
);
await Promise.all(workers);
},
{ signal },
);
} finally {
events.close();
}
},
};
}Using it from a route or action
const indexer = createRepositoryIndexer({
apiBaseUrl: "https://api.example.com",
workerCount: 4,
});
const progress = indexer.events.subscribe();
const controller = new AbortController();
const running = indexer.run(controller.signal);
for (const repositoryId of selectedRepositoryIds) {
const [enqueueError] = await indexer.enqueue(
repositoryId,
controller.signal,
);
if (enqueueError) {
controller.abort(enqueueError);
break;
}
}
indexer.close();
for await (const event of progress) {
renderIndexProgress(event);
}
await running;In a real UI, the progress subscription often lives in a websocket, server-sent events stream, or Electron IPC handler. The shape stays the same: subscribe, render events, and abort when the user leaves.
Why this is easier to operate
- The queue is bounded, so user-selected work cannot grow memory forever.
- Worker count is explicit, so local Git subprocesses stay polite.
- API quota waits happen before remote calls, not after the upstream complains.
- Metadata reads are cached, but failed reads are not stored.
- Owner lookups batch together while still returning one result per job.
- Retry policy sits around the flaky remote read, not around the whole pipeline.
- Progress is just an async subscription, so it can feed logs, UI, or tests.
Smaller variations
Process a fixed list without a long-running queue
If you already have the whole list and do not need progress fanout, mapLimit() is smaller.
import { mapLimit } from "yieldless/all";
const [error, summaries] = await mapLimit(
selectedRepositoryIds,
(repositoryId, _index, signal) => indexOneRepository(repositoryId, signal),
{
concurrency: 4,
signal,
},
);Keep only the cache and batcher
If the pipeline is overkill, keep the read model helpers.
const repository = await metadata.get(repositoryId, { signal });
const owner = await owners.load(ownerId, { signal });Add a circuit breaker to an optional dependency
If indexing can continue without feature flags or recommendations, wrap that optional remote call with a circuit breaker and use a fallback when it is open.
import { CircuitOpenError, createCircuitBreaker } from "yieldless/breaker";
import { fetchJsonSafe } from "yieldless/fetch";
const loadRecommendations = createCircuitBreaker(
(signal, repositoryId: string) =>
fetchJsonSafe<Recommendation[]>(`/recommendations/${repositoryId}`, {
timeoutMs: 2_000,
signal,
}),
{
failureThreshold: 3,
cooldownMs: 15_000,
},
);
const [recommendationError, recommendations] =
await loadRecommendations(repositoryId);
if (recommendationError instanceof CircuitOpenError) {
return [null, []] as const;
}
if (recommendationError) {
return [recommendationError, null] as const;
}
return [null, recommendations] as const;Avoid: hiding the pipeline inside a generic worker framework
const indexer = makeRuntime({
retry: true,
cache: true,
batch: true,
queue: true,
workers: 4,
});That looks shorter, but the operational choices disappear. Yieldless works best when the important limits remain close to the work they protect.