How to build a real-time CDC event integration with your customer’s Salesforce

Syncing CRM data to your application on a batch schedule is one of the most common ways to integrate with your customers’ Salesforce. For example, fetching Account and Opportunity records every 4 hours to power a lead scoring algorithm. But some sales automation use cases require knowing about changes to customer CRM records in real time, like triggering an internal workflow or Slack notification immediately after an Opportunity status changes to closed-won. In these use cases, consuming Salesforce change events in real-time is critical to delivering a great product experience for end users.

Many SaaS products (including HubSpot) offer an HTTP webhooks API to support this real-time event integration use case, but Salesforce doesn’t have a webhooks API. Instead, Salesforce lets you consume Change Data Capture (CDC) events via a gRPC-based Pub/Sub API, a less common but more robust pattern for consuming events. However, it’s also significantly more difficult to work with and comes with several Salesforce-specific quirks that developers must work around.

This guide explains how the Salesforce Pub/Sub API works and how to use it to build a real-time CDC event integration with your customers’ Salesforce.

Prerequisites

To get the most out of this guide, you should have the following:

Introduction to gRPC

gRPC is a modern, open-source, high-performance remote procedure call (RPC) framework that can operate in any environment. It uses protobuf as its interface definition language, which helps developers define services and message types for their RPC calls. gRPC supports long-running connections and bi-directional communication, which the Salesforce Pub/Sub gRPC API leverages for subscribing to CDC events.

Source: https://developer.salesforce.com/docs/platform/pub-sub-api/guide/flow-control.html

Introduction to Avro

Avro is a data serialization system that transmits data between different systems. It is similar to other serialization systems such as JSON and XML but is designed for high performance and language-agnostic. One of the benefits of Avro is that it has a compact binary format that is very efficient to transmit over the network. In the context of Salesforce, Avro encodes the payloads of Change Data Capture (CDC) events sent to subscribers via the Salesforce Pub/Sub gRPC API.

Setup

For this project, we will use the buf Connect suite of libraries, which provides an easy-to-use and modern interface for gRPC clients in Node.js. We will also use the fast and modern avsc library to decode the Avro-encoded payloads of the events.

Let’s start by installing the @bufbuild/connect, @bufbuild/connect-node, and avsc libraries. The @bufbuild/* libraries are ESM modules, so we need to add the following to our package.json file:

"type": "module"

We can then install the dependencies using npm:

npm install @bufbuild/connect @bufbuild/connect-node avsc

Compiling Protobuf Files

We need to generate the gRPC client code from the Salesforce Pub/Sub API protobuf file. To do this, we use the buf tool, which is a command-line tool that helps us manage protobuf files and generate client code from them.

Next, create a buf.gen.yaml file in your project directory and add the following:

version: v1
managed:
  enabled: true
plugins:
  - plugin: buf.build/bufbuild/connect-es
    out: gen
  - plugin: buf.build/bufbuild/es
    out: gen

This configuration file tells the buf tool to generate the gRPC client code in the gen directory. Now, we can generate the gRPC client code by running the following command:

buf generate pubsub_api.proto

Connecting to Salesforce Pub/Sub gRPC API

After compiling the protobuf files to client code, we connect to the Salesforce Pub/Sub gRPC API. Here we assume that you already have code in place that handles getting the customers’ access token, instance URL and tenant ID via OAuth.

import { createPromiseClient } from '@bufbuild/connect';
import { createGrpcTransport } from '@bufbuild/connect-node';
import { PubSub } from './gen/pubsub_api_connect';

const baseUrl = 'https://api.pubsub.salesforce.com:443';

const transport = createGrpcTransport({
  baseUrl,
  interceptors: [
    (next) => async (req) => {
      // add our credentials to the request headers for each request
      req.header.append('accesstoken', accessToken);
      req.header.append('instanceurl', instanceUrl);
      req.header.append('tenantid', tenantId);

      return await next(req);
    },
  ],
});

const client = createPromiseClient(PubSub, transport);

Consuming and Decoding CDC Events

Having created a client for the Salesforce Pub/Sub gRPC API, we can start consuming and decoding CDC events. Salesforce produces these into channels, either for all events in the system (the /data/ChangeEvents channel), or a channel for each entity type (/data/<entityName>ChangeEvents). In our example here, we will subscribe to the channel for Opportunity change events, /data/OpportunityChangeEvents.

The protobuf messages we get from the Pub/Sub gRPC API have this general shape:

message FetchResponse {
  repeated ConsumerEvent events = 1;
  bytes latest_replay_id = 2;
  string rpc_id = 3;
  int32 pending_num_requested = 4;
}
message ConsumerEvent {
  ProducerEvent event = 1;
  bytes replay_id = 2;
}
message ProducerEvent {
  string id = 1;
  string schema_id = 2;
  bytes payload = 3; // <-------- this is the Avro-encoded event payload
  repeated EventHeader headers = 4;
}
message EventHeader {
  string key = 1;
  bytes value = 2;
}

// Source: https://github.com/developerforce/pub-sub-api/blob/main/pubsub_api.proto

And the decoded Avro payloads have this shape:

{
  "schema": <schema_ID>, 
  "payload": {
    "ChangeEventHeader": {
       "entityName": "...",
       "recordIds": "...",
       "changeType": "...",
       "changeOrigin": "...",
       "transactionKey": "...",
       "sequenceNumber": "...",
       "commitTimestamp": "...",
       "commitUser": "...",
       "commitNumber": "...",
       "nulledFields": [...]
       "diffFields": [...]
       "changedFields": [...]
    }, 
   "field1": "...",
   "field2": "...",
   . . .
  }, 
  "event": {
    "replayId": <replayID>
  }
}

// Source: https://developer.salesforce.com/docs/atlas.en-us.change_data_capture.meta/change_data_capture/cdc_message_structure.htm

Now, we subscribe to the /data/OpportunityChangeEvent channel and start consuming events:

import { ReplayPreset } from './gen/pubsub_api_pb';
import { Readable } from 'stream';
import { pipeline } from 'stream/promises';
import { parse } from 'avsc';

async function consumeAndDecodeCDCEvents() {
    const subscriptionRequest = {
      topicName: '/data/OpportunityChangeEvent',
      replayPreset: ReplayPreset.LATEST,
      numRequested: 100,
    };

		// in a production system, this should probably be implemented as a queue 
    // you can push new requests into so you can make more requests as you 
    // have received the number of events requested in the previous request
    const subscriptionRequests = Readable.from([subscriptionRequest])

		// this sets up a two-way stream with Salesforce
    const cdcEventStream = Readable.from(
        client.subscribe(subscriptionRequests)
    );

    await pipeline(
        cdcEventStream,
        async function * handleMessage(message) {
            // extract the event objects and latest replayId from the message
						// in a production system, you would persist the latestReplayId
						// so you can restart the stream on error, and send requests for
						// more events with the correct offset
            const { events, latestReplayId } = message;

						for (const { event } in events) {
                const { schemaId, payload } = event;
                const schemaResponse = await client.getSchema(
						        { schemaId }
							  );
                const { schemaJson } = schemaResponse;

								// you would probably want to cache this and not
								// request it from the API every time, since the schema is
								// usually stable
                const schema = parse(schemaJson);

                // decode the Avro payload
                yield schema.fromBuffer(payload);
            }
        },
        // do something with the decoded payload
				// like persist it to a queue for processing.
        console.log,
    );
}

consumeAndDecodeCDCEvents().catch(console.error);

This script will continue running and log to the console each time a CDC event is received from Salesforce. In your system, you could push the Avro payload into an internal queue for processing.

Other considerations

This basic implementation shows how you can start consuming CDC events, but doesn't cover some of the Salesforce-specific quirks you need to handle before bring your integration to production.

Decoding field bitmaps

Each Avro message payload includes a ChangeEventHeader that includes additional information about the event, including  the nulledFields, changedFields, and diffFields bitmap fields that indicate which fields in the entity have either changed value to null, had their value change to another value, or their values are diffs that should be applied to the current value of that field on the entity that the event is produced for, respectively.

A bitmap field uses a bitmap encoded as a hexadecimal string to represent the fields in an entity. This method is more space efficient than using a list of field names. For example, in the changedFields bitmap field, a bit set to 1 indicates that the field at the corresponding position in the Avro schema was changed.

A bitmap field is an array of strings. The first element of the array contains a bitmap for the individual fields in the schema. Compound fields are placed in additional elements of the array, with bitmaps indicating nested fields. The format for the additional array elements is {ParentFieldPosition}-{NestedFieldBitmap}. For example, 3-0x0000c2 would indicate a bitmap of 0x0000c2 for the nested fields of the compound field in the third position in the Avro schema.

This is quite complex to deal with, but we do handle it in the Supaglue product. The code we included in Supaglue for identifying the changed fields using the bitmap fields can be found here.

Handling Gap and Overflow events

Many streaming pub-sub platforms offer eventually consistent guarantees, like Kafka’s at least once or exactly-once semantics. Salesforce’s PubSub API does not offer these guarantees and can potentially be lossy. Some of the events that you receive could be Gap or Overflow Events, and need to be handled differently by your system, depending on your use case. The way you handle these events can be fairly complex, especially for Overflow Events.

Gap events

A Gap Event is produced by Salesforce when there is an error or other issue with Salesforce that makes it so it is actually unable to generate a change event, or the generated change event would be over 1MB in size, or the event was generated by a process (like a data cleanup job) that changes records directly in the database.

In this case it sends an event with a changeType field (GAP_CREATE, GAP_UPDATE, GAP_DELETE, or GAP_UNDELETE) that only includes the data in the header, which includes the record ID of the changed record, and it’s up to the developer to fetch the record from the Salesforce API to get the latest data.

A Gap Event looks something like this:

{
  "data":{
    "schema":"5GiC-KFwBGmSyDn8mqrXgg",
    "payload":{
      "ChangeEventHeader":{
        "commitNumber":288672011,
        "commitUser":"005xx000001TT96AAG",
        "sequenceNumber":1,
        "entityName":"Task",
        "changeType":"GAP_UPDATE",
        "changedFields": [],
        "changeOrigin":"",
        "transactionKey":"0Mcxx000000000B",
        "commitTimestamp":1509502088161,
        "recordIds":[
            "00Uxx000000fvRQEAY"
        ]
      }
    },
    "event":{
        "replayId":15
    }
  },
  "channel":"/data/ChangeEvents"
}

In this case, the event is produced for a Task entity with the record ID 00Uxx000000fvRQEAY, so you could use that information to look up the entity using the REST API to figure out what changed in the record.

Overflow events

In the case where over 100,000 changes happen in a single transaction, Salesforce emits standard CDC events for the first 100,000 changes, and a single Overflow Event per entity type for the changes over the first 100,000. For example, if a cascade delete results in the deletion of 110,000 account, contact, opportunity and activity records in a single transaction, you would receive events for the first 100,000 records deleted, and one Overflow Event for each of the entity types for the remaining 10,000 records.

These Overflow Events look similar to Gap Events in that they only include the header, and they have a changeType of GAP_OVERFLOW:

{
  "data": {
    "schema": "DSE-RYlg-96DubFClb3e4Q",
    "payload": {
      "ChangeEventHeader": {
        "commitNumber": 170898760,
        "commitUser": "005xx000001SySSAA0",
        "sequenceNumber": 11,
        "entityName": "Activity",
        "changeType": "GAP_OVERFLOW",
        "changedFields": [],
        "changeOrigin": "",
        "transactionKey": "0000161d-7324-6ed7-36f8-16ebb5d61ec9",
        "commitTimestamp": 1513039469186,
        "recordIds": [
          "000000000000000AAA"
        ]
      }
    },
    "event": {
      "replayId": 13
    }
  },
  "channel": "/data/ChangeEvents"
}

In these events, the recordId is always "000000000000000AAA" and doesn’t actually identify any real record.

When you receive an Overflow Event, the correct process to handle synchronization is pretty complex:

  1. Unsubscribe from the channel, and stop processing further events since you have to do a full synchronization to get back into a consistent state.
  2. Persist the `replayId` of the overflow event.
  3. Reconcile the data for new, updated, and undeleted records:
    1. Retrieve all records from the Salesforce API for the entity specified in the header.
    2. Synchronize the data in your system by overwriting your data with these fetched records.
  4. Reconcile the deleted records by performing one of the following actions:
    1. Get the non-deleted records from Salesforce, and synchronize:
      1. Identify all records for that entity in your system that weren’t updated through the synchronization that you performed in step 3. These would be records that were deleted.
      2. Delete the identified records from your system.
    2. Or get the deleted records from Salesforce, and synchronize:
      1. Query all records from the Salesforce API for the entity with `isDeleted=true`.
      2. Identify the records in your system that match the records returned in the previous step and delete these.
  5. Resubscribe to the channel starting from the `replayId` you saved in step 2 and continue processing the events.

Conclusion

This was a basic guide on how to consume and decode Avro-formatted CDC events from Salesforce using the Salesforce Pub/Sub gRPC API. We also covered some of the more complex considerations, like how to identify which fields changed via the bitmap fields in the event headers, and how to handle Gap and Overflow Events.

Building a real-time CDC events integration with Salesforce is doable, but it’s tricky to get right and even more complex to maintain. Yet a real-time Salesforce integration delivers superior user experiences and differentiation against competitors who fallback to batch. If you’re looking for a real-time Salesforce integration but don’t want to sink significant engineering resources, we’d love to help.

Supaglue has a robust implementation with all these details considered that produces events you can consume using standard HTTP webhooks. We even handle the OAuth flow (including token refresh) so you don’t have to! If you are interested in trying this out, read about our real-time events feature or sign up to get early access.

Accelerate your integrations roadmap with Supaglue

Supaglue is joining Stripe! Read more.