How we improved the performance of our Salesforce to Postgres syncs

Our customers use Supaglue to sync up to tens of millions of records from their customers’ CRMs to their own databases. The CRM data is often used to power AI and other user-facing product experiences, so it’s critical that the syncs happen quickly and reliably.

In this post, we discuss the initial implementation of our Salesforce-to-Postgres pipeline and how we significantly improved its speed through various performance optimizations.

The initial implementation

In the early days of Supaglue, we wanted to ship a working end-to-end product as quickly as possible. To move fast, the initial Salesforce → Postgres sync implementation was quite naive. It looked something like this:

1. Issue a request to Salesforce’s REST API (which uses SOQL, Salesforce’s SQL-like query language) for all contacts. For example:

curl /services/data/vXX.X/query?q=SELECT+Id,FirstName,LastName,Email,...+FROM+Contact


2. The response returns one page of records along with a cursor for the next page of results. We map the records into a call to our Postgres ORM, Prisma, which generated a SQL statement to batch upsert those records.

3. Repeat steps 1-2 for all pages.

While this is a very common pattern and was easy to generalize to other CRM providers, it took too long to sync data for customers with millions of CRM records.

Bulk API 2.0

Salesforce’s Bulk API 2.0 is the recommended way to query for large amounts of data. Bulk API 2.0 enables asynchronous processing of SOQL queries, breaking it into several phases:

1. First, submit a query job, which returns a jobId:

curl -X POST /services/data/vXX.X/query -d '{"query":"SELECT+Id,FirstName,LastName,Email+FROM+Contact"}'

2. Then, repeatedly poll for the status of the query job.

curl /services/data/vXX.X/jobs/query/<jobId>

3. When it’s complete, you can issue requests to download the records, which have been divided into chunks, which you can think of as pages. Salesforce decides the sizing of these chunks. The response header contains the key Sforce-locator, representing the cursor for the next chunk, and the records will be in the response body.

curl /services/data/vXX.X/jobs/query/<jobId>/results?locator=<locator> -H "Accept: text/csv"


While the chunks of data were often very large (we often saw 200k+ records per chunk), we would break the chunks into smaller batches to upsert into Postgres using Prisma.

While this was more efficient than the naive paging solution, it had some issues:

  • If multiple syncs were running at the same time, this would consume a lot of memory, since we were potentially reading millions of records into memory at once. Potentially, the syncing process could run out of memory.
  • We were still bottlenecked by the batching of writes to Postgres.

Streaming from Salesforce to Postgres

There’s no reason to wait until an entire chunk of data is read into memory from Bulk API 2.0 before starting the work of upserting them in batches to Postgres. So, we decided to stream the data from Salesforce into Postgres. The high level flow looked something like this:

  1. Consume the Bulk API 2.0 response body (in CSV format) as a stream.
  2. Perform some transformations (like mapping to a unified model).
  3. Convert the records into CSV format.
  4. COPY the records into a temporary table in Postgres.
  5. Merge the new records into the main table using INSERT INTO ... ON CONFLICT ... DO UPDATE SET ...

Let’s explore these steps in detail.

Consume Bulk API 2.0 response body as a stream

In our Typescript/Node.js codebase, we had been using axios to issue requests to Salesforce. Unfortunately, axios does not support consuming the response body as a stream, so we had to move to using the native fetch module instead. Here’s some sample code for how to do that:

import { parse } from 'csv-parse';
import { pipeline } from 'stream';

const parser = parse({
  columns: true,
});

async function processBulk2QueryJobResults(locator?: string): Promise<Readable> {
	const response = await fetch(`/services/data/vXX.X/jobs/query/${jobId}/results`, {
		Accept: 'text/csv',
	});
	return pipeline(
		Readable.fromWeb(response.body),
		parser,
		() => {}
	);
}

Notes:

  • The csv-parse library provides a nice parser Transform that we use to parse the CSV response body into JSON objects.
  • Readable.fromWeb is required due to fetch returning a Web stream, while Node.js has a different convention for streams. Conceptually, they are essentially the same concept.
  • pipeline is used to move data through streams. In this case, as we read data from the fetch response body, we write it into the parser, which reads in CSV lines and emits JSON objects for ease of further manipulation farther down in the pipeline.

There’s plenty of nuance and complexity to discuss with streams in Node.js, but that should be saved for another post.

Perform transformations

In order to map to our common schema, we just chain on another Transform to the pipeline, like this:

// ...

const mapper = new Transform({
  objectMode: true,
  transform: (chunk, encoding, callback) => {
		const mappedRecord = applyMapping(chunk);
		callback(null, mappedRecord);
	}
});

async function processBulk2QueryJobResults(locator?: string): Promise<Readable> {
  // ...
  return pipeline(
    Readable.fromWeb(response.body),
    parser,
    mapper,
    () => {}
	)
}

Convert records into CSV format

We need to map this back into CSV format to make use of Postgres’ COPY command (we’ll talk about it next). We found a nice CSV stringifier with streaming API, which would take the incoming Salesforce records and format them for Postgres’ COPY:

import { stringify } from 'csv-stringify';

// ...

const stringifier = stringify({
  columns: [ /* columns */ ],
  cast: {
    // ...
  },
  quoted: true,
});

async function processBulk2QueryJobResults(locator?: string): Promise<Readable> {
  // ...
  return pipeline(
    Readable.fromWeb(response.body),
    parser,
    mapper,
    stringifier,
    () => {}
	)
}

Use Postgres’ COPY command

The recommended way to ingest large amounts of data into Postgres is by using the COPY command. Postgres can COPY data from stdin or file in a variety of formats, including csv. We cannot copy directly into the main table, however, since we only want the latest version of each record, and not one row per version of each record. Therefore, we stream our data into a temporary table with a schema identical to that of the main table using the pg and pg-copy-streams Node.js modules. Temporary tables are convenient to use here, because they are only visible to the current session.

import { Pool, PoolClient } from 'pg';
import { from as copyFrom } from 'pg-copy-streams';

const pool = new Pool();

const tempTable = `temp_table`;
const columns = [ /* columns */ ];

// ...

async function processBulk2QueryJobResults(locator?: string): Promise<Readable> {
  // ...
  const client = await pool.connect();
  const stream = client.query(
    copyFrom(`COPY ${tempTable} (${columns.join(',')}) FROM STDIN WITH (DELIMITER ',', FORMAT CSV`)
  );
  return pipeline(
    Readable.fromWeb(response.body),
    parser,
    mapper,
    stringifier,
    stream
	)
}

Merge records into main table

Now that our records are streamed into the temporary table, we need to upsert them into the main table. We used a INSERT INTO ... ON CONFLICT ... DO UPDATE SET ... SQL query to upsert records:

// ...

async function processBulk2QueryJobResults(locator?: string): Promise<void> {
  // ...
  await pipeline(
    Readable.fromWeb(response.body),
    parser,
    mapper,
    stringifier,
    stream
	);

  await client.query(`INSERT INTO ... ON CONFLICT ... DO UPDATE SET ...`);
}

Next steps

While we’re happy with our performance for where we are today, we’d like to explore strategies to improve our syncs in the future. For example:

  • Pipelining requests: In Salesforce’s Bulk API 2.0, while there can be a lot of records to download from the response body, the “next page” locator is in the header, which can be pulled immediately. Therefore, we could consider pipelining requests for the next page while we’re streaming the current page’s records into Postgres. If the Bulk API 2.0 returned 10 pages of data, you could essentially be downloading all 10 pages in parallel. In theory, this should yield a big bump in throughput, assuming there aren’t any limits we aren’t aware of on Salesforce’s side.
  • Streaming to a regular table: By streaming to a temporary table in Postgres, we lose the ability to resume COPY in the case of the process crashing. Alternatively, we could stream into a regular table and keep track of a watermark so that if the sync process were to crash, it wouldn’t need to re-stream from the beginning.

Sync performance is core to what we do and the product experience we deliver to our customers. If you’re thinking about syncing data from your customers’ CRMs, please reach out. We’d love to chat!

Accelerate your integrations roadmap with Supaglue

Supaglue is joining Stripe! Read more.