diff API
Record-level change detection between datasets using Arrow IPC chunks. All diff computation runs in WASM and avoids JS materialization. Diff is available on Scale tier or higher.
npm install @rowops/diff
Data Contract (Arrow IPC)
- Input datasets are Arrow IPC chunk arrays (
Uint8Array[]), each chunk is a complete IPC stream. - Output is Arrow IPC with diff rows plus a summary from
diff_counts. - No row arrays or JSON datasets cross the WASM boundary.
Recommended Usage (ImportJob)
import type { ImportJob } from "@rowops/import-core";
async function runDiff(job: ImportJob, previousVersionIpcChunks: Uint8Array[]) {
const result = await job.diff({
compareDatasetIpcChunks: previousVersionIpcChunks,
keyFields: ["id"],
fuzzy: {
enabled: true,
fields: ["firstName", "lastName", "email"],
threshold: 0.85,
},
onProgress: (pct, message) => console.log(pct, message),
});
console.log(result.added, result.removed, result.updated);
return result;
}
Local Baseline Diff (Importer)
Local baseline Diff stores the latest baseline in IndexedDB and compares the current cleaned dataset to it. It returns counts plus a list of changed keys. No field-level deltas or before/after values are emitted. Running Diff requires Scale tier or higher.
import { saveLocalBaseline, runLocalBaselineDiff } from "@rowops/diff";
import { createDiffBaselineStore } from "@rowops/data-layer";
import type { Entitlements, ProcessedChunkRecord } from "@rowops/core";
async function saveBaseline(chunks: AsyncIterable<ProcessedChunkRecord>) {
const store = createDiffBaselineStore();
await store.open();
return saveLocalBaseline({
scope: { projectId: "proj_123", schemaId: "schema_contacts" },
keySpec: { columns: ["id"] },
sourceStage: "validate",
chunks,
store,
});
}
async function runBaselineDiff(
chunks: AsyncIterable<ProcessedChunkRecord>,
entitlements: Entitlements
) {
const store = createDiffBaselineStore();
await store.open();
const result = await runLocalBaselineDiff({
scope: { projectId: "proj_123", schemaId: "schema_contacts" },
keySpec: { columns: ["id"] },
chunks,
store,
entitlements,
});
if (result.status === "complete") {
console.log(result.summary, result.changes?.length);
}
}
Local Baseline Diff Types
| Type | Description |
|---|---|
DiffKeySpec | { columns: string[] } (single or composite key) |
BaselineMeta | Baseline metadata and hashing version info |
DiffSummary | Counts for added/removed/changed/unchanged plus ambiguous keys |
DiffResultPage | Paged changed key list (added/removed/changed) |
Local Baseline Diff Statuses
complete: Summary and change list are availableno_baseline: No local baseline saved for this scopekey_mismatch: Baseline key spec differs from current selectionhash_mismatch: Baseline hash version mismatchunsupported: Row count exceeds the local baseline size captier_blocked: Diff is not available for the current tier
Local baseline Diff is latest-only per scope (project + schema) and fails closed above the size cap (default 2,000,000 rows). Ambiguous keys (duplicates) are excluded from added/removed/changed/unchanged totals.
Worker API (DIFF_ARROW)
import { createDiffWorker, createArrowWorkerRequest, prepareWorkerInputs } from "@rowops/core";
import { resolveBrowserLicense } from "@rowops/import-core";
async function runDiffWorker(datasetAChunks: Uint8Array[], datasetBChunks: Uint8Array[]) {
const worker = createDiffWorker();
const { tierGateInit } = await resolveBrowserLicense({
projectId: "proj_xxx",
entitlementToken: "eyJ...",
});
const { chunksToSend: ipcChunksA, transferList: transferA } =
prepareWorkerInputs(datasetAChunks);
const { chunksToSend: ipcChunksB, transferList: transferB } =
prepareWorkerInputs(datasetBChunks);
const request = createArrowWorkerRequest("DIFF_ARROW", {
ipcChunksA,
ipcChunksB,
keyFields: ["id"],
fuzzy: { enabled: true, fields: ["email"], threshold: 0.9 },
tierGate: tierGateInit,
});
worker.postMessage(request, [...transferA, ...transferB]);
worker.onmessage = (event) => {
const msg = event.data;
if (msg.type === "PROGRESS") {
console.log(msg.payload.progress, msg.payload.message);
return;
}
if (msg.status === "COMPLETE") {
const { resultIpc, summary } = msg.payload;
console.log(summary);
// resultIpc is Arrow IPC with diff rows
}
};
}
Result IPC Schema
The diff output is an Arrow IPC stream with these columns:
| Column | Type | Description |
|---|---|---|
| row_index_b | u32 | Row index in dataset B |
| row_index_a | u32? | Matched row index in dataset A (null for added) |
| kind | u8 | 0=added, 1=removed, 2=updated, 3=unchanged, 4=conflict |
| score | f64? | Similarity score for fuzzy matches (null for exact) |
| diff_fields | list<utf8>? | Field names that changed (null for non-updated rows) |
| diff_old_values | list<utf8?>? | Old values aligned to diff_fields (null for non-updated rows) |
| diff_new_values | list<utf8?>? | New values aligned to diff_fields (null for non-updated rows) |
| old_row | struct<...>? | Full old row payload (null for added rows) |
| new_row | struct<...>? | Full new row payload (null for removed rows) |
Removed rows use row_index_b = 0 and a non-null row_index_a.
For updated rows, diff field/value lists are aligned by index.
Row payloads use the original dataset schemas (A for old_row, B for new_row).
Configuration Types
DiffConfig
interface DiffConfig {
keyFields: string[];
}
FuzzyConfig
interface FuzzyConfig {
enabled: boolean;
fields: string[];
threshold: number;
weights?: Record<string, number>;
}
Note: scoring currently uses equal weighting across fields. weights is reserved for future tuning.
Summary Counts (Low-Level WASM)
import { initWasm, diff_counts } from "@rowops/engine-wasm";
async function summarize(resultIpc: Uint8Array) {
await initWasm();
const counts = diff_counts(resultIpc);
const [added, removed, updated, unchanged, conflicts] = counts;
return { added, removed, updated, unchanged, conflicts };
}
Tier Behavior
- Free: diff disabled
- Pro: exact key matching only
- Scale and Enterprise: fuzzy matching + conflict detection
Row limits: Pro 100,000; Scale 1,000,000; Enterprise unlimited (Free disabled).
Utility Functions
computeDiffSummary
Aggregate in-memory diff results into summary statistics.
import type { DiffResult } from "@rowops/diff";
import { computeDiffSummary } from "@rowops/diff";
function summarize(results: DiffResult[]) {
const summary = computeDiffSummary(results);
console.log(summary.added, summary.updated);
return summary;
}