Skip to main content

diff API

Record-level change detection between datasets using Arrow IPC chunks. All diff computation runs in WASM and avoids JS materialization. Diff is available on Scale tier or higher.

npm install @rowops/diff

Data Contract (Arrow IPC)

  • Input datasets are Arrow IPC chunk arrays (Uint8Array[]), each chunk is a complete IPC stream.
  • Output is Arrow IPC with diff rows plus a summary from diff_counts.
  • No row arrays or JSON datasets cross the WASM boundary.

import type { ImportJob } from "@rowops/import-core";

async function runDiff(job: ImportJob, previousVersionIpcChunks: Uint8Array[]) {
const result = await job.diff({
compareDatasetIpcChunks: previousVersionIpcChunks,
keyFields: ["id"],
fuzzy: {
enabled: true,
fields: ["firstName", "lastName", "email"],
threshold: 0.85,
},
onProgress: (pct, message) => console.log(pct, message),
});

console.log(result.added, result.removed, result.updated);
return result;
}

Local Baseline Diff (Importer)

Local baseline Diff stores the latest baseline in IndexedDB and compares the current cleaned dataset to it. It returns counts plus a list of changed keys. No field-level deltas or before/after values are emitted. Running Diff requires Scale tier or higher.

import { saveLocalBaseline, runLocalBaselineDiff } from "@rowops/diff";
import { createDiffBaselineStore } from "@rowops/data-layer";
import type { Entitlements, ProcessedChunkRecord } from "@rowops/core";

async function saveBaseline(chunks: AsyncIterable<ProcessedChunkRecord>) {
const store = createDiffBaselineStore();
await store.open();

return saveLocalBaseline({
scope: { projectId: "proj_123", schemaId: "schema_contacts" },
keySpec: { columns: ["id"] },
sourceStage: "validate",
chunks,
store,
});
}

async function runBaselineDiff(
chunks: AsyncIterable<ProcessedChunkRecord>,
entitlements: Entitlements
) {
const store = createDiffBaselineStore();
await store.open();

const result = await runLocalBaselineDiff({
scope: { projectId: "proj_123", schemaId: "schema_contacts" },
keySpec: { columns: ["id"] },
chunks,
store,
entitlements,
});

if (result.status === "complete") {
console.log(result.summary, result.changes?.length);
}
}

Local Baseline Diff Types

TypeDescription
DiffKeySpec{ columns: string[] } (single or composite key)
BaselineMetaBaseline metadata and hashing version info
DiffSummaryCounts for added/removed/changed/unchanged plus ambiguous keys
DiffResultPagePaged changed key list (added/removed/changed)

Local Baseline Diff Statuses

  • complete: Summary and change list are available
  • no_baseline: No local baseline saved for this scope
  • key_mismatch: Baseline key spec differs from current selection
  • hash_mismatch: Baseline hash version mismatch
  • unsupported: Row count exceeds the local baseline size cap
  • tier_blocked: Diff is not available for the current tier

Local baseline Diff is latest-only per scope (project + schema) and fails closed above the size cap (default 2,000,000 rows). Ambiguous keys (duplicates) are excluded from added/removed/changed/unchanged totals.


Worker API (DIFF_ARROW)

import { createDiffWorker, createArrowWorkerRequest, prepareWorkerInputs } from "@rowops/core";
import { resolveBrowserLicense } from "@rowops/import-core";

async function runDiffWorker(datasetAChunks: Uint8Array[], datasetBChunks: Uint8Array[]) {
const worker = createDiffWorker();
const { tierGateInit } = await resolveBrowserLicense({
projectId: "proj_xxx",
entitlementToken: "eyJ...",
});
const { chunksToSend: ipcChunksA, transferList: transferA } =
prepareWorkerInputs(datasetAChunks);
const { chunksToSend: ipcChunksB, transferList: transferB } =
prepareWorkerInputs(datasetBChunks);
const request = createArrowWorkerRequest("DIFF_ARROW", {
ipcChunksA,
ipcChunksB,
keyFields: ["id"],
fuzzy: { enabled: true, fields: ["email"], threshold: 0.9 },
tierGate: tierGateInit,
});

worker.postMessage(request, [...transferA, ...transferB]);

worker.onmessage = (event) => {
const msg = event.data;

if (msg.type === "PROGRESS") {
console.log(msg.payload.progress, msg.payload.message);
return;
}

if (msg.status === "COMPLETE") {
const { resultIpc, summary } = msg.payload;
console.log(summary);
// resultIpc is Arrow IPC with diff rows
}
};
}

Result IPC Schema

The diff output is an Arrow IPC stream with these columns:

ColumnTypeDescription
row_index_bu32Row index in dataset B
row_index_au32?Matched row index in dataset A (null for added)
kindu80=added, 1=removed, 2=updated, 3=unchanged, 4=conflict
scoref64?Similarity score for fuzzy matches (null for exact)
diff_fieldslist<utf8>?Field names that changed (null for non-updated rows)
diff_old_valueslist<utf8?>?Old values aligned to diff_fields (null for non-updated rows)
diff_new_valueslist<utf8?>?New values aligned to diff_fields (null for non-updated rows)
old_rowstruct<...>?Full old row payload (null for added rows)
new_rowstruct<...>?Full new row payload (null for removed rows)

Removed rows use row_index_b = 0 and a non-null row_index_a. For updated rows, diff field/value lists are aligned by index. Row payloads use the original dataset schemas (A for old_row, B for new_row).


Configuration Types

DiffConfig

interface DiffConfig {
keyFields: string[];
}

FuzzyConfig

interface FuzzyConfig {
enabled: boolean;
fields: string[];
threshold: number;
weights?: Record<string, number>;
}

Note: scoring currently uses equal weighting across fields. weights is reserved for future tuning.


Summary Counts (Low-Level WASM)

import { initWasm, diff_counts } from "@rowops/engine-wasm";

async function summarize(resultIpc: Uint8Array) {
await initWasm();
const counts = diff_counts(resultIpc);
const [added, removed, updated, unchanged, conflicts] = counts;
return { added, removed, updated, unchanged, conflicts };
}

Tier Behavior

  • Free: diff disabled
  • Pro: exact key matching only
  • Scale and Enterprise: fuzzy matching + conflict detection

Row limits: Pro 100,000; Scale 1,000,000; Enterprise unlimited (Free disabled).


Utility Functions

computeDiffSummary

Aggregate in-memory diff results into summary statistics.

import type { DiffResult } from "@rowops/diff";
import { computeDiffSummary } from "@rowops/diff";

function summarize(results: DiffResult[]) {
const summary = computeDiffSummary(results);
console.log(summary.added, summary.updated);
return summary;
}

See Also