# Exploring versions of the Postgres logical replication protocol

## **Introduction**

[Logical Replication](https://www.postgresql.org/docs/current/logical-replication.html) is one of the [many](https://www.postgresql.org/docs/16/different-replication-solutions.html) ways a Postgres database can replicate data to other Postgres database (a.k.a standby). Logical replication directly reads from the [write-ahead log](https://www.postgresql.org/docs/current/wal-intro.html) (WAL), recording every database change, avoiding the need to intercept queries or periodically read the table. These changes are filtered, serialized and then sent to the standby servers where they can be applied. While logical replication is intended to be used by Postgres databases to send and receive changes, it also allows ETL tools like [PeerDB](https://www.peerdb.io/) to get a reliable stream of changes that can be processed as needed.

Logical replication started by only allowing streaming of committed transactions. It then evolved to support in-flight transactions followed by [two-phase commits](https://www.postgresql.org/docs/current/two-phase.html) and then parallel apply of in-flight transactions. This blog will dive into this evolution, its impact on performance, and present some useful benchmarks. This blog is useful for anyone who uses Postgres Logical Replication in practice!

## **Components of logical replication**

For a quick rundown, a full logical replication setup involves several crucial components. **Please skip this section if you are already familiar with the concepts of logical replication.**

1\. [**Replication Slot**](https://www.postgresql.org/docs/current/logicaldecoding-explanation.html#LOGICALDECODING-REPLICATION-SLOTS): A replication slot on the primary server is what reads changes from the WAL and passes it to the output plugin to be serialized and sent to the standby server (or ETL tool) to be applied. Periodically, the standby server sends a message to the primary to confirm that it has read the WAL to a certain point, at which point the slot can advance.

2\. [**Publication**](https://www.postgresql.org/docs/current/logical-replication-publication.html): A publication is essentially a filter on the WAL changes. Publications are very powerful and can filter out schemas, tables and even particular columns of tables. You can also choose to publish inserts and not updates and also apply custom logic to filter out certain rows. When a standby starts reading from a replication slot, a set of publications are passed as input.

3\. [**Subscriptions**](https://www.postgresql.org/docs/current/logical-replication-subscription.html): A subscription is basically the Postgres syntax for creating a logical replication connection to a primary server for replicating changes from a slot and a set of publications. The standby then reads data from the primary and replicates it as long as the subscription is active. While this is Postgres specific, other tools end up behaving like subscribed standbys and get the same output from the primary server.

4\. [**Output plugins**](https://www.postgresql.org/docs/current/logicaldecoding-explanation.html#LOGICALDECODING-EXPLANATION-OUTPUT-PLUGINS): The replication slot passes raw WAL change data to an output plugin which serializes it to a stream of messages. This helps with the interoperability of logical replication as the message format is independent of the underlying database version or configuration. The de-facto output plugin is a Postgres project called `pgoutput` but other plugins like `wal2json` and `decoderbufs` enjoy support among the community.

## **Wait, logical replication has versions?**

When starting logical replication ([START\_REPLICATION](https://www.postgresql.org/docs/current/protocol-logical-replication.html)), there is a [parameter](https://www.postgresql.org/docs/current/protocol-logical-replication.html) called `proto_version` that allows users to opt in to newer semantics of the logical replication protocol. Starting with Postgres 14 in September 2021, three new `proto_version`s of logical replication have been added in consecutive releases. Looking at the docs for `proto_version` right now, we see this:

```plaintext
proto_version
    Protocol version. Currently versions 1, 2, 3, and 4 are supported.
    
    Version 2 is supported only for server version 14 and above, 
    and it allows streaming of large in-progress transactions.
    
    Version 3 is supported only for server version 15 and above, 
    and it allows streaming of two-phase commits.
    
    Version 4 is supported only for server version 16 and above, and it 
    allows streams of large in-progress transactions to be applied 
    in parallel.
```

While these all sound like good things, it's not clear for the average reader what they mean or what problems are being tackled. And for the informed reader who knows what these changes mean, it'd still be nice to understand how they are implemented and their impact on real-world workloads.

### **v1 - the status quo**

To analyze the messages and semantics of the various protocol versions, we've written a small Go application called `polorex`. If you want to check out the code or try things out for yourself, check out the code in [this repo](https://github.com/PeerDB-io/polorex).

To simulate a workload, we are running 2 transactions concurrently, inserting rows into the same table. The transactions insert rows in 100 batches of 250,000, totalling 50 million rows. The workload is simulated by a subcommand of the `polorex` application. The transactions are read and analyzed by another subcommand called `txnreader` which connects to the database and continuously reads the replication slot.

```plaintext
./polorex txnreader -port 7132
[in a different terminal]
./polorex txngen -port 7132 -iterations 100 -batchsize 250000 -parallelism 2
```

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1706541313918/7fc71484-2f5c-47a4-9ad5-4c974251dd2c.png align="center")

The transactions start at the green line and end at the red line. We can see how the transactions are being read only after they commit. It takes 3-4 minutes to decode both our transactions. Since we just committed 2 large transactions, the `pgoutput` plugin has to read a lot of WAL at once and then serialize it into 50 million `INSERT` messages to be sent over. While the graph shows that we are reading almost 250K inserts per second, but one can see how this could quickly go out of hand for larger transactions with wider schemas. We could quickly fall behind the primary server purely due to this decoding overhead.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1706541553668/831dc1f1-7a86-4d24-a0ae-89c535f89dba.png align="center")

Another issue which follows from this but is less obvious is with regards to the size of the replication slot. This is basically the amount of WAL being retained for the slot to decode changes without losing any data. Looking at the graph, it quickly rises as the transactions progress, **but also stays high until both transactions are read** at which point it falls dramatically. This can be an issue in workloads with high throughput and large transactions - the WAL being retained can reach hundreds of gigabytes within a matter of hours, thereby consuming the entire disk space and crashing the Postgres server.

With this insight in mind, we can see how version 2's promise of allowing `streaming of large in-progress transactions` sound enticing. But there is also a simplicity in version 1 of only sending changes over when they are committed. We read a `BeginMessage` and everything from there onward is fair game to be replicated immediately. In contrast, an "in-progress" transaction could be rolled back at any point, and therefore all the changes read so far need to be staged somehow before being replicated.

### **v2 - rows down the stream**

To begin with, we restart `txnreader` with a flag to ask it to use protocol version 2 while connecting to the slot. We then rerun the same `txngen` workload.

```plaintext
./polorex txnreader -port 7132 -protocol 2
[in a different terminal]
./polorex txngen -port 7132 -iterations 100 -batchsize 250000 -parallelism 2
```

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1706548856165/1b570457-9d75-45b2-baa2-1b3e48ffaa73.png align="center")

We are seeing a completely different story in terms of how the transactions are being processed here. It's clear that we're getting rows way before the transaction even commits. We're actually seeing `streaming of in-progress transactions`! Rows for a particular transaction come to us between a `StreamStartMessage` and `StreamStopMessage`, and we get several of these streams while rows are still being sent over. We are getting streams for both of our transactions before any of them commit, but we are still only reading 1 transaction at a time.

A transaction being streamed now commits using a `StreamCommitMessage`, but unlike the `Commit` message from earlier, we **need** to wait for this since the fate of the transaction is not known yet. Alternatively, we could receive a `StreamAbortMessage` which implies transaction rollback and so all our changes for said transaction should not be applied.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1706550671096/fd6abba6-31a0-4380-9cc4-e98207c4e9c6.png align="center")

The improvements from streaming are nothing short of dramatic, we can see how transactions are fully read seconds after the rows finish inserting, approximately 4 minutes earlier compared to version 1. As a result, the slot size also decreases much more quickly.

### **Results - v2 enables faster decoding and shorter peak slot size duration**

To reiterate, there is no magical improvement in transaction reading performance or peak slot size. The transactions themselves take about the same time to process and generate the same amount of WAL, but since the replication happens in parallel with the transaction, we see better performance.

In version 2, transactions are fully decoded, and the slot size decreases immediately after the transactions are completed, compared to version 1, which requires an additional 4 minutes. This can have drastic impact on workloads with high throughput and sizable transactions - version 2 can be very helpful in enhancing logical decoding performance and ensuring the slot size is kept in check!

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1706568527289/9278eb76-9e32-4916-8b12-0641636edd3a.png align="center")

### **v3 and v4 - 2PC and parallel apply**

Version 3 introduces new message types to manage [two-phase commit](https://www.postgresql.org/docs/current/sql-prepare-transaction.html) transactions. While significant in certain scenarios, the concept of two-phase commit remains relatively niche from an ELT standpoint.

Version 4 is less clear in its description, and even the documentation doesn't venture much farther than this. As it turns out, it doesn't refer to applying multiple transactions in parallel, but spreading out the load of applying a single large transaction over multiple processes in the standby. For this, new fields have been added to some existing messages. This is again a great feature in some workloads, but not very useful from the standpoint of something else pretending to be a Postgres standby.

### **Conclusion**

Postgres logical replication is a powerful feature central to the distributed/HA Postgres ecosystem. By using version 2 of the logical replication protocol to stream in-flight transactions, we can efficiently manage WAL spikes during sizable transactions, enhancing logical decoding performance and mitigating disk full issues caused by replication slot growth. Additionally, this approach reduces the lag between the Postgres source and its readers.

At [PeerDB](https://www.peerdb.io/), we're developing a feature that utilizes version 2 of the logical replication protocol to consume changes from a Postgres database before they are committed. We believe this feature will significantly benefit Postgres users grappling with issues related to replication slot growth. Overall, version 2 of the logical replication protocol presents a promising solution for optimizing Postgres replication processes and improving overall reliability and performance.
