Recipes
Customer Import
Import a customer CSV with validation, bounded concurrency, duplicate checks, progress events, and polite API calls.
Bulk imports are familiar to almost every product team. They look simple until a customer uploads a file with thousands of rows, duplicate emails, invalid data, and a slow CRM integration.
This recipe keeps the import understandable:
- validate each row as data
- check duplicates in batches
- limit outbound CRM calls
- publish progress as rows complete
- stop cleanly when the user cancels
Import service
import { createBatcher } from "yieldless/batcher";
import { safeTry, safeTrySync } from "yieldless/error";
import { fetchJsonSafe } from "yieldless/fetch";
import { mapLimit } from "yieldless/all";
import { createRateLimiter } from "yieldless/limiter";
import { createPubSub } from "yieldless/pubsub";
import { parseSafe } from "yieldless/schema";
interface CsvCustomerRow {
readonly email: string;
readonly name: string;
readonly plan: "free" | "pro" | "enterprise";
}
type ImportEvent =
| { readonly type: "started"; readonly total: number }
| { readonly type: "row-imported"; readonly row: number; readonly email: string }
| { readonly type: "row-skipped"; readonly row: number; readonly reason: string }
| { readonly type: "finished"; readonly imported: number; readonly skipped: number };
export function createCustomerImporter() {
const events = createPubSub<ImportEvent>({ replay: 25 });
const crmQuota = createRateLimiter({ limit: 120, intervalMs: 60_000 });
const existingCustomers = createBatcher<string, boolean>({
waitMs: 2,
maxBatchSize: 250,
loadMany: async (emails, signal) => {
const [error, existing] = await customerRepository.existsByEmail(
emails,
signal,
);
if (error) {
return [error, null] as const;
}
const existingSet = new Set(existing);
return [null, emails.map((email) => existingSet.has(email))] as const;
},
});
async function importRow(
row: unknown,
index: number,
signal: AbortSignal,
) {
const rowNumber = index + 1;
const [parseError, customer] = parseSafe(customerRowSchema, row);
if (parseError) {
events.publish({
type: "row-skipped",
row: rowNumber,
reason: "Invalid customer data",
});
return [null, { imported: 0, skipped: 1 }] as const;
}
const [duplicateError, exists] = await existingCustomers.load(
customer.email,
{ signal },
);
if (duplicateError) {
return [duplicateError, null] as const;
}
if (exists) {
events.publish({
type: "row-skipped",
row: rowNumber,
reason: "Customer already exists",
});
return [null, { imported: 0, skipped: 1 }] as const;
}
const [quotaError] = await crmQuota.takeSafe({ signal });
if (quotaError) {
return [quotaError, null] as const;
}
const [crmError] = await fetchJsonSafe("/crm/customers", {
method: "POST",
body: JSON.stringify(customer),
headers: { "content-type": "application/json" },
timeoutMs: 4_000,
signal,
});
if (crmError) {
return [crmError, null] as const;
}
events.publish({
type: "row-imported",
row: rowNumber,
email: customer.email,
});
return [null, { imported: 1, skipped: 0 }] as const;
}
return {
events,
async importCsv(file: File, signal: AbortSignal) {
const [readError, text] = await safeTry(file.text());
if (readError) {
return [readError, null] as const;
}
const [parseError, rows] = safeTrySync(() => parseCustomerCsv(text));
if (parseError) {
return [parseError, null] as const;
}
events.publish({ type: "started", total: rows.length });
const [importError, results] = await mapLimit(
rows,
importRow,
{
concurrency: 8,
signal,
},
);
if (importError) {
return [importError, null] as const;
}
const summary = results.reduce(
(total, row) => ({
imported: total.imported + row.imported,
skipped: total.skipped + row.skipped,
}),
{ imported: 0, skipped: 0 },
);
events.publish({ type: "finished", ...summary });
return [null, summary] as const;
},
};
}Streaming progress to a UI
const importer = createCustomerImporter();
const progress = importer.events.subscribe();
const controller = new AbortController();
const running = importer.importCsv(uploadedFile, controller.signal);
void running.finally(() => progress.close());
for await (const event of progress) {
updateImportScreen(event);
}
const [error, summary] = await running;The UI does not need to know about batching, rate limiting, or CRM failures. It gets ordinary domain events.
Why this is a good fit
- Invalid rows become skipped rows instead of crashing the whole import.
- Duplicate checks batch together instead of hitting the database once per row.
- CRM calls run with bounded concurrency and a rate limit.
- A single
AbortSignallets the user cancel the import from the UI. - Progress is an async subscription, so it works for web sockets, server-sent events, Electron IPC, or tests.
Avoid: one promise per row
await Promise.all(rows.map((row) => importCustomer(row)));That pattern looks tidy until a customer uploads 20,000 rows. Use mapLimit() when the input is customer-sized.