Yieldless
Recipes

Customer Import

Import a customer CSV with validation, bounded concurrency, duplicate checks, progress events, and polite API calls.

Bulk imports are familiar to almost every product team. They look simple until a customer uploads a file with thousands of rows, duplicate emails, invalid data, and a slow CRM integration.

This recipe keeps the import understandable:

  • validate each row as data
  • check duplicates in batches
  • limit outbound CRM calls
  • publish progress as rows complete
  • stop cleanly when the user cancels

Import service

import { createBatcher } from "yieldless/batcher";
import { safeTry, safeTrySync } from "yieldless/error";
import { fetchJsonSafe } from "yieldless/fetch";
import { mapLimit } from "yieldless/all";
import { createRateLimiter } from "yieldless/limiter";
import { createPubSub } from "yieldless/pubsub";
import { parseSafe } from "yieldless/schema";

interface CsvCustomerRow {
  readonly email: string;
  readonly name: string;
  readonly plan: "free" | "pro" | "enterprise";
}

type ImportEvent =
  | { readonly type: "started"; readonly total: number }
  | { readonly type: "row-imported"; readonly row: number; readonly email: string }
  | { readonly type: "row-skipped"; readonly row: number; readonly reason: string }
  | { readonly type: "finished"; readonly imported: number; readonly skipped: number };

export function createCustomerImporter() {
  const events = createPubSub<ImportEvent>({ replay: 25 });
  const crmQuota = createRateLimiter({ limit: 120, intervalMs: 60_000 });

  const existingCustomers = createBatcher<string, boolean>({
    waitMs: 2,
    maxBatchSize: 250,
    loadMany: async (emails, signal) => {
      const [error, existing] = await customerRepository.existsByEmail(
        emails,
        signal,
      );

      if (error) {
        return [error, null] as const;
      }

      const existingSet = new Set(existing);

      return [null, emails.map((email) => existingSet.has(email))] as const;
    },
  });

  async function importRow(
    row: unknown,
    index: number,
    signal: AbortSignal,
  ) {
    const rowNumber = index + 1;
    const [parseError, customer] = parseSafe(customerRowSchema, row);

    if (parseError) {
      events.publish({
        type: "row-skipped",
        row: rowNumber,
        reason: "Invalid customer data",
      });
      return [null, { imported: 0, skipped: 1 }] as const;
    }

    const [duplicateError, exists] = await existingCustomers.load(
      customer.email,
      { signal },
    );

    if (duplicateError) {
      return [duplicateError, null] as const;
    }

    if (exists) {
      events.publish({
        type: "row-skipped",
        row: rowNumber,
        reason: "Customer already exists",
      });
      return [null, { imported: 0, skipped: 1 }] as const;
    }

    const [quotaError] = await crmQuota.takeSafe({ signal });

    if (quotaError) {
      return [quotaError, null] as const;
    }

    const [crmError] = await fetchJsonSafe("/crm/customers", {
      method: "POST",
      body: JSON.stringify(customer),
      headers: { "content-type": "application/json" },
      timeoutMs: 4_000,
      signal,
    });

    if (crmError) {
      return [crmError, null] as const;
    }

    events.publish({
      type: "row-imported",
      row: rowNumber,
      email: customer.email,
    });

    return [null, { imported: 1, skipped: 0 }] as const;
  }

  return {
    events,
    async importCsv(file: File, signal: AbortSignal) {
      const [readError, text] = await safeTry(file.text());

      if (readError) {
        return [readError, null] as const;
      }

      const [parseError, rows] = safeTrySync(() => parseCustomerCsv(text));

      if (parseError) {
        return [parseError, null] as const;
      }

      events.publish({ type: "started", total: rows.length });

      const [importError, results] = await mapLimit(
        rows,
        importRow,
        {
          concurrency: 8,
          signal,
        },
      );

      if (importError) {
        return [importError, null] as const;
      }

      const summary = results.reduce(
        (total, row) => ({
          imported: total.imported + row.imported,
          skipped: total.skipped + row.skipped,
        }),
        { imported: 0, skipped: 0 },
      );

      events.publish({ type: "finished", ...summary });

      return [null, summary] as const;
    },
  };
}

Streaming progress to a UI

const importer = createCustomerImporter();
const progress = importer.events.subscribe();
const controller = new AbortController();

const running = importer.importCsv(uploadedFile, controller.signal);
void running.finally(() => progress.close());

for await (const event of progress) {
  updateImportScreen(event);
}

const [error, summary] = await running;

The UI does not need to know about batching, rate limiting, or CRM failures. It gets ordinary domain events.

Why this is a good fit

  • Invalid rows become skipped rows instead of crashing the whole import.
  • Duplicate checks batch together instead of hitting the database once per row.
  • CRM calls run with bounded concurrency and a rate limit.
  • A single AbortSignal lets the user cancel the import from the UI.
  • Progress is an async subscription, so it works for web sockets, server-sent events, Electron IPC, or tests.

Avoid: one promise per row

await Promise.all(rows.map((row) => importCustomer(row)));

That pattern looks tidy until a customer uploads 20,000 rows. Use mapLimit() when the input is customer-sized.

On this page