Postgres to Elasticsearch Real time Replication using PeerDB

Postgres to Elasticsearch Real time Replication using PeerDB

Today, PeerDB is pleased to announce that our target connector for Elasticsearch is now in beta. Elasticsearch is a popular search engine system underpinned by a distributed document database, and we have been seeing a lot of use cases for Elasticsearch in our customers' data stacks. This is our first connector for a document database, and we are excited to bring PeerDB's performance, reliability and value to users looking to move data from Postgres to Elasticsearch.

This post explains some of the use cases that are enabled by Postgres to Elasticsearch replication, followed by a quick demo showcasing the high performance and low latency of Postgres to Elasticsearch replication using PeerDB. Finally, we go through a high level overview of the architecture of the connector.

Postgres to Elasticsearch Replication Use cases

Some common use cases for Postgres to Elasticsearch replication via CDC or query replication are:

  1. Efficient search for large ingest volumes: Elasticsearch's bread and butter use case is as a search engine operating efficiently even on humongous volumes of data. From full-text and weighted search to even complex semantic searches using built-in NLP models, Elasticsearch is very flexible and tunable. It is commonly used for ingesting and indexing large volumes of logs, and even as a backing engine for searching large websites and internal knowledge bases.

  2. Denormalizing data to documents: Data models are often stored in Postgres in a highly normalized form, which is great for transactional integrity but bad for complex queries where may have to use joins or CTEs. Elasticsearch being a document database prefers storing data in a denormalized form. Using PeerDB's query replication capabilities, you are able to periodically transform your data into a denormalized form which makes it more efficient for querying by downstream consumers. Some processing can also be done using an Elasticsearch ingest pipeline.

Low latency replication from Postgres to Elasticsearch using PeerDB

In this section, I'll walk through a quick demonstration of Postgres to Elasticsearch replication using PeerDB in Change Data Capture (CDC) mode. Using PeerDB for replication from Postgres to Elasticsearch offers a few benefits, primary ones being blazing fast initial loads, and sub minute latencies by constantly reading the slot, PeerDB is able to offer these by being laser focused around Postgres replication.

Postgres Setup

You can use any Postgres database in the cloud or on-prem. For simplicity, I'm using a Postgres cluster running locally in a Docker container for this demo. We create a table named oss1 with a continuous ingest of 1000 rows per second using a multi-valued insert statement.

postgres=# CREATE TABLE oss1 (
           id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
           c1 INT,
           c2 INT,
           t TEXT,
           updated_at TIMESTAMP WITH TIME ZONE DEFAULT now()
         );
CREATE TABLE
postgres=# INSERT INTO oss1 (c1, c2, t)
SELECT
    generate_series AS c1,
    generate_series * 2 AS c2,
    'text_' || generate_series AS t
FROM 
    generate_series(1, 1000); 
# to run the INSERT once per second    
postgres=# \watch 1
INSERT 0 1000
INSERT 0 1000
INSERT 0 1000

Elasticsearch Setup

You can setup an Elasticsearch instance using its Docker compose setup on-prem or on a cloud VM. Alternatively you can also use Elasticsearch Cloud. For this demo, I am using the Docker compose setup running locally.

PeerDB Setup

You can use PeerDB Open Source or PeerDB Cloud to deploy a PeerDB instance. For the scope of this demo, I'm deploying PeerDB open source locally via Docker compose.

Create Peers and Mirror for Postgres to Elasticsearch Replication

In the PeerDB world, peers refer to either source or target data stores. You can use PeerDB's UI to create the Postgres and the Elasticsearch peers. A mirror is then created between a source peer and a destination peer for data replication. You can use PeerDB's UI to create a MIRROR for replicating data from Postgres to Elasticsearch.

I have created a Change Data Capture (CDC) based MIRROR that replicates data using Postgres' Write-Ahead Log (WAL) and Logical Decoding. It involves two steps:

  1. An initial load that takes a fully consistent snapshot of existing data in Postgres and copies it to Elasticsearch; Through PeerDB's parallel snapshotting, you can expect significantly faster initial loads. We've seen terabytes of data moved in hours vs days.

  2. Change Data Capture (CDC): Once the initial load is completed, PeerDB constantly reads changes in Postgres through the logical replication slot and replicates those changes to Elasticsearch. Thanks to our streaming architecture, expect data latency in the range of seconds for a continuously running mirror to Elasticsearch.

The initial load should complete pretty quickly, and rows should be present in the created Elasticsearch index. After entering continuous CDC mode, new rows should show up as and when they are inserted. Attached below is a quick video showing a Postgres to Elasticsearch CDC mirror.

Architecture and Design Choices

We've talked about PeerDB's streaming architecture in detail before, but in summary PeerDB utilizes Go's goroutines and channels to efficiently read data from PostgreSQL using logical replication, and then pushes it to Elasticsearch in batches through the Bulk API. This approach enhances the execution time by enabling parallel processing.

Our data warehouse connectors store the data in a staging table before pushing it to the final table for cost and performance reasons. Due to Elasticsearch's architecture and query language, we are also able to avoid this intermediate step directly send the stream of processed records to Elasticsearch indices via the bulk API.

Handling Updates and Deletes in Elasticsearch

PeerDB supports Elasticsearch as a target for both CDC and query replication. In most cases we recommend using CDC because of its ease of use, increased reliability and its ability to replicate DELETEs to Elasticsearch. However, this limits the scope of transformations that can be done before loading to Elasticsearch.

To support deduplication on the Elasticsearch side, we need a unique ID for each document that remains consistent so we can update or delete it as per the source. For tables with one column in the primary key, the value of the column itself can be used. For tables with multiple columns in the primary key, we instead choose to hash the values of the columns together, giving a small unique identifier irrespective of the width of the row.

// simplified Go code
func primaryKeyColsHash(record []any, colIndices []int) string {
    hasher := sha256.New()

    for _, colIndex := range colIndices {
        // write the value to the hasher
        _, _ = fmt.Fprint(hasher, record[colIndex])
    }
    hashBytes := hasher.Sum(nil)
    return base64.RawURLEncoding.EncodeToString(hashBytes)
}
# Sample document uploaded by PeerDB to Elasticsearch.
# Note how the _id field is a (base64 encoded) hash of the
# primary key columns id and c1.
{
  "_index": "public.oss2",
  "_id": "SAgdSqEaQyGYWxOo8Dj2s0DbXsQXLTC_CWlds8-c4kY",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "c1": 434017,
    "c2": 922856,
    "id": 8,
    "t": "pgbenchinsertc4b998821cc6b161e65489b3",
    "updated_at": "2024-05-08T18:33:39.031107Z"
  }
}

Query replication can be done in append mode, where any change creates a fresh document in Elasticsearch or in upsert mode where some columns are designated as key columns which are deduplicated in a way similar to CDC.

Dynamic Mapping for Data Types

By default, PeerDB currently uses Elasticsearch's dynamic mapping to automatically infer a data type mapping based on the contents of the documents in an index. In practice, numeric types are mapped to either long or float, timestamp types are mapped to date and most other types map to text. A more detailed mapping is available here. This works for many use cases. If needed an explicit mapping can be provided by the user during manual index creation and PeerDB will load documents to this index.

Conclusion

Elasticsearch connector is in beta -- we already have customers who have moved billions of rows from Postgres to Elasticsearch using PeerDB. If you're an Elasticsearch user and wish to replicate data from Postgres to Elasticsearch using PeerDB, do give PeerDB a shot! We would love to help you out or get feedback:

  1. Try PeerDB Cloud for free.

  2. Visit PeerDB's GitHub repository to Get Started.

  3. Join our Slack and say hi!

Thanks for reading!