3

Inside logical replication in PostgreSQL: How it works

 1 year ago
source link: https://www.postgresql.fastware.com/blog/inside-logical-replication-in-postgresql
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
img-blog-curtain-author-vignesh-orange-to-yellow.png
Logical replication allows fine-grained control over both data replication and security. In this blog I'll go through the fundamentals of Logical Replication and some use cases. 

My paper on the Internals of Logical Replication was one of the 27 CFP's selected from 120 submissions. During the event, I covered the following topics:

Introduction

Vigneshwaran C on stage at PFConf India 2023

Logical replication is a method of replicating data changes from publisher to subscriber. The node where a publication is defined is referred to as the publisher. The node where a subscription is defined is referred to as the subscriber. Logical replication allows fine-grained control over both data replication and security.

Logical replication uses a publish and subscribe model with one or more subscribers subscribing to one or more publications on a publisher node. Subscribers pull data from the publications they subscribe to and may subsequently re-publish data to allow cascading replication or more complex configurations.

Use cases

  • Sending incremental changes in a single database or a subset of a database to subscribers as they occur.
  • Firing triggers for individual changes as they arrive on the subscriber.
  • Consolidating multiple databases into a single one (e.g., for analytical purposes).  Replicating between different major versions of PostgreSQL.
  • Replicating between PostgreSQL instances on different platforms (e.g., Linux to Windows).
  • Giving access to replicated data to different groups of users.
  • Sharing a subset of the database between multiple databases.

Architecture

Below, I illustrate how logical replication works in PostgreSQL 15. I will refer back to this diagram later in this post.

img-dgm-logical-replcation-02

Publication

Publications can be defined on the primary node whose changes should be replicated. A publication is a set of changes generated from a table or a group of tables and might also be described as a change set or replication set. Each publication exists in only one database.

Each table can be added to multiple publications if needed. Publications may currently only contain tables and all tables in schema.

Publications can choose to limit the changes they produce to any combination of INSERT, UPDATE, DELETE, and TRUNCATE, similar to how triggers are fired by particular event types. By default, all operation types are replicated.

When a publication is created, the publication information will be added to pg_publication catalogue table:

postgres=# CREATE PUBLICATION pub_alltables FOR ALL TABLES;
CREATE PUBLICATION
postgres=# SELECT * FROM pg_publication;
  oid  |    pubname    | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
-------+---------------+----------+--------------+-----------+-----------+-----------+-------------+------------
 16392 | pub_alltables |       10 | t            | t         | t         | t         | t           | f
(1 row)

Information about table publication is added to pg_publication_rel catalog table:

postgres=# CREATE PUBLICATION pub_employee FOR TABLE employee;
CREATE PUBLICATION
postgres=# SELECT oid, prpubid, prrelid::regclass FROM pg_publication_rel;
  oid  | prpubid | prrelid
-------+---------+----------
 16407 |   16406 | employee
(1 row)

Information about schema publications is added to pg_publication_namespace catalog table:

postgres=# CREATE PUBLICATION pub_sales_info FOR TABLES IN SCHEMA marketing, sales;
CREATE PUBLICATION
postgres=# SELECT oid, pnpubid, pnnspid::regnamespace FROM pg_publication_namespace;
  oid  | pnpubid |  pnnspid
-------+---------+-----------
 16410 |   16408 | marketing
 16411 |   16408 | sales
(2 rows)

Subscription

A subscription is the downstream side of logical replication.  It defines the connection to another database and set of publications (one or more) to which it wants to subscribe.

The subscriber database behaves in the same way as any other PostgreSQL instance, and can be used as a publisher for other databases by defining its own publications. A subscriber node may have multiple subscriptions.  It is possible to define multiple subscriptions between a single publisher-subscriber pair, in which case care must be taken to ensure that the subscribed publication objects don't overlap.

Each subscription will receive changes via one replication slot.  Additional replication slots may be required for the initial synchronization of pre-existing table data, which will be dropped at the end of data synchronization.

When a subscription is created, the subscription information will be added to the pg_subscription catalog table:

postgres=# CREATE SUBSCRIPTION sub_alltables
CONNECTION 'dbname=postgres host=localhost port=5432'
PUBLICATION pub_alltables;
NOTICE:  created replication slot "sub_alltables" on publisher
CREATE SUBSCRIPTION
postgres=# SELECT oid, subdbid, subname, subconninfo, subpublications FROM pg_subscription;
  oid  | subdbid |    subname       |               subconninfo                | subpublications
-------+---------+------------------+------------------------------------------+-----------------
 16393 |       5 | sub_alltables    | dbname=postgres host=localhost port=5432 | {pub_alltables}
(1 row)

The subscriber will connect to the publisher and get the list of tables that the publisher is publishing. In our earlier example, we created pub_alltables to publish data of all tables - the publication relations will be added to the pg_subscription_rel catalog tables:

postgres=# SELECT srsubid, srerelid::regclass FROM pg_subscription_rel;
 srsubid | srrelid
---------+---------
   16399 |   accounts
   16399 |   accounts_roles
   16399 |   roles
   16399 |   department
   16399 |   employee
(5 rows)

The subscriber connects to the publisher and creates a replication slot, whose information is available in pg_replication_slots:

postgres=# SELECT slot_name, plugin, type, datoid, database, temporary, active,
active_pid, restart_lsn, confrm_flush_lsn FROM pg_replication_slots;
 slot_name     | plugin   | slot_type | datoid | database | temporary | active | active_pid | restart_lsn | confirmed_flush_lsn
---------------+----------+-----------+--------+----------+-----------+--------+------------+-------------+---------------------
 sub_alltables | pgoutput | logical   |      5 | postgres | f         | t      |      24473 | 0/1550900   | 0/1550938          
(1 row)

Subscribers add the subscription stats information to pg_stat_subscription

postgres=# SELECT subid, subname, received_lsn FROM pg_stat_subscription;
subid  | subname         |   received_lsn
-------+-----------------+----------------
 16399 | sub_alltables    | 0/1550938
(1 row)

The initial part of the CREATE SUBSCRIPTION command will be completed and returned to the user. The remaining work will be done in the background by the replication launcher, walsender, apply worker, and tablesync worker after the CREATE SUBSCRIPTION command is completed.

Processes

Replication launcher

This process is started by the postmaster during the start of the instance. It will periodically check the pg_subscription catalog table to see if any subscriptions have been added or enabled.

The logical replication worker launcher uses the background worker infrastructure to start the logical replication workers for every enabled subscription.

vignesh 24438 /home/vignesh/postgres/inst/bin/postgres -D subscriber
vignesh 24439 postgres: checkpointer
vignesh 24440 postgres: background writer
vignesh 24442 postgres: walwriter
vignesh 24443 postgres: autovacuum launcher
vignesh 24444 postgres: logical replication launcher

Once the launcher process identifies that a new subscription has been created or enabled, it will start an apply worker process.

The apply worker running can be seen from the process list:

vignesh 24438 /home/vignesh/postgres/inst/bin/postgres -D subscriber
vignesh 24439 postgres: checkpointer
vignesh 24440 postgres: background writer
vignesh 24442 postgres: walwriter
vignesh 24443 postgres: autovacuum launcher
vignesh 24444 postgres: logical replication launcher
vignesh 24472 postgres: logical replication apply worker for subscription 16399
vignesh 24473 postgres: walsender vignesh postgres 127.0.0.1(55020) START_REPLICATION

The above information illustrates step 1 mentioned in the Architecture section above.

Apply worker

The apply worker will iterate through the table list and launch tablesync workers to synchronize the tables. Each table will be synchronized by one tablesync worker.

Multiple tablesync workers (one for each table) will run in parallel based on the max_sync_workers_per_subscription configuration.

img-dgm-logical-replication-apply-worker-01

The apply worker will wait until the tablesync worker copies the initial table data and sets the table state to ready state in pg_subscription_rel.

postgres=# SELECT srsubid, srrelid::regclass, srsubstate, srsublsn FROM pg_subscription_rel;
 srsubid |    srrelid     | srsubstate | srsublsn 
---------+----------------+------------+-----------
   16399 | accounts       | r          | 0/156B8D0
   16399 | accounts_roles | r          | 0/156B8D0
   16399 | department     | r          | 0/156B940
   16399 | employee       | r          | 0/156B940
   16399 | roles          | r          | 0/156B978
(5 rows)

The above information illustrates step 2 mentioned in the Architecture section above.

Note: Currently, DDL operations are not supported by logical replication. Only DML changes will be replicated. 

Tablesync worker

  • The initial data synchronization is done separately for each table, by a separate tablesync worker.
  • Create a replication slot with the USE_SNAPSHOT option and copy table data with the COPY command.
  • The tablesync worker will request the publisher to start replicating data from the publisher.
  • The tablesync worker will synchronize data from walsender until it reaches the syncworker’s LSN set by the apply worker. 
img-dgm-logical-replication-tablesync-worker-01

The above information illustrates step 3 mentioned in the Architecture section above.

Walsender

The walsender is started when the subscriber connects to the publisher and requests WAL. It then reads the WAL record by record, and decodes it to get the tuple data and size.

The changes are queued into the reorderbufferqueue. The reorderbufferqueue collects individual pieces of transactions in the order they are written to the WAL. When a transaction is completed, it will reassemble the transaction and call the output plugin with the changes. If the reorderbufferqueue exceeds logical_decoding_work_mem, then find the largest transaction and evict it to disk.

If streaming is enabled, then this transaction data will be sent to the subscriber, but will be applied in the subscriber only after the transaction is committed in the publisher.

Once the transaction is committed, the walsender performs the following:

  • Checks if this relation should be published (based on ALL TABLESor TABLE list or TABLES IN SCHEMA list specified in the publication).
  • Checks if this operation should be published (based on what the user has specified for the publish option – insert/update/delete/truncate).
  • Changes the publish relation ID if publish_via_partition_rootis set. In this case, the relation ID of the ancestor will be sent.
  • Checks if this row should be sent based on the condition specified by row filter
  • Checks if this column should be sent based on the column list specified.

The walsender then updates the statistics like txn count, txn bytes, spill count, spill bytes, spill txns, stream count, stream bytes, stream txns.

The above information illustrates steps 7 and 8 mentioned in the Architecture.

Replicating incremental changes

Incremental changes are handled by the walsender and the apply worker, as described below.

img-dgm-logical-replication-replicationg-incremental-changes-01

The table above details step 9 mentioned in the Architecture diagram.

Apply worker failure handling

If the apply worker fails due to an error, the apply worker process will exit. During its normal operation, the apply worker will have maintained the origin LSN during the last transaction commit.

The replication launcher will periodically check if the subscription worker is running. If the launcher identifies that it is not, then it will restart the worker for the subscription. The apply worker will request start_replication streaming from the last origin LSN that was committed. The walsender will start streaming transactions from the origin LSN (last committed transaction) requested by the apply worker.

Whenever the apply worker encounters a constraint error such as duplicate constraint error, check constraint error, etc, it will exit and repeat the steps mentioned above.

2023-02-22 11:55:51.479 IST [21204] ERROR:  duplicate key value violates unique constraint "employee_pkey"
2023-02-22 11:55:51.479 IST [21204] DETAIL:  Key (eid)=(1) already exists.
2023-02-22 11:55:51.479 IST [21204] CONTEXT:  processing remote data for replication origin "pg_16395"
during message type "INSERT" for replication target relation "public.employee" in transaction 751,
finished at 0/1562C10

There is an option to skip the LSN in case of errors - user can set skip lsn of the failing transaction in this case. If the user sets to skip LSN, the apply worker will check if the transaction matches the LSN specified, skip this transaction, and proceed to the next one.

postgres=# ALTER SUBSCRIPTION sub_alltables SKIP (lsn = '0/1562C10');
ALTER SUBSCRIPTION

The user can use disable_on_error instead of repeatedly trying the steps. In this case, any error in the apply worker will be caught using try() /catch(), and the subscription will be disabled before the apply worker exists. As the subscription is disabled, the launcher will not restart the apply worker for the subscription.

postgres=# ALTER SUBSCRIPTION sub_alltables SET (DISABLE_ON_ERROR = 'on');
ALTER SUBSCRIPTION
postgres=# SELECT oid, subname, subdisableonerr, subpublications FROM pg_subscription;
  oid  |    subname    | subdisableonerr | subpublications
-------+---------------+-----------------+-----------------
 16395 | sub_alltables | t               | {pub_alltables}
(1 row)

Altering a subscription

img-dgm-logical-replication-altering-a-subscription-01The apply worker will periodically check the current subscription values against the new ones - if they have been changed:

  • The apply worker will exit.
  • The launcher will restart the apply worker.
  • The apply worker will load the new subscription values from the pg_subscription system table.
  • The apply worker will apply the changes using the newly modified values.

How synchronous_commit is achieved

In the subscriber, create a subscription with the synchronous_commit option set to ‘on’

In the publisher, use ALTER SYSTEM SET to set the synschronous_standby_names option to the subscription name, and reload the configuration using pg_reload_conf. Verify that is_sync option is enabled in pg_stat_replication.

Subscriber

postgres=# CREATE SUBSCRIPTION sync
CONNECTION 'dbname=postgres host=localhost port=5432'
PUBLICATION sync
WITH (synchronous_commit = 'on');
NOTICE:  created replication slot "sync" on publisher
CREATE SUBSCRIPTION

Publisher

postgres=# ALTER SYSTEM SET synchronous_standby_names TO 'sync';
ALTER SYSTEM
postgres=# SELECT pg_reload_conf();
 pg_reload_conf
----------------
 t
(1 row)
postgres=# SELECT application_name, sync_state = 'sync' AS is_sync
FROM pg_stat_replication
WHERE application_name = 'sync';
 application_name | is_sync
------------------+---------
 sync             | t
(1 row)
img-dgm-logical-replication-how-synchronous-commit-is-achieved-01

Replication slot

A replication slot ensures that the publisher will retain the WAL logs that are needed by the replicas even when they are disconnected from the subscriber.

As mentioned earlier, each (active) subscription receives changes from a replication slot on the remote (publishing) side.

Additional table synchronization slots are normally transient, created internally to perform initial table synchronization, and dropped automatically when they are no longer needed.

Normally, the remote replication slot is created automatically when the subscription is created during CREATE SUBSCRIPTION, and it is dropped automatically when the subscription is dropped during DROP SUBSCRIPTION.

Replication slots provide an automated way to ensure that the primary does not remove WAL segments until they have been received by all standbys.

Row filters

When creating a publication, a WHERE clause can be specified. This information is stored in the pg_publication_relcatalog table:

postgres=# CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE);
CREATE PUBLICATION
postgres=# SELECT oid, prpubid, prrelid, pg_get_expr (prqual, prrelid) FROM pg_publication_rel;
  oid  | prpubid | prrelid |   pg_get_expr
-------+---------+---------+------------------
 16457 |   16456 |   16426 | (active IS TRUE)
(1 row)

Rows that don't satisfy this WHERE clause will be filtered by the publisher. This allows a set of tables to be partially replicated.

During table synchronization, only the table data that satisfies the row filter will be copied to the subscriber. 

img-dgm-logical-replication-row-filters-01

If the subscription has several publications in which the same table has been published with different filters (for the same publish operation), the expressions get OR 'ed, and rows satisfying any of the expressions are replicated.

If the subscription has several publications, and at least one of them was specified using ALL TABLES or TABLES IN SCHEMA and the table belongs to the referred schema, then those publications take precedence, and the publication behaves as if there are no row filters. 

Replication filters transformation

For insert operations, the publisher checks if the new row satisfies the row filter condition to determine if the new record should be sent to the subscriber or skipped.

For delete operations, the publisher checks if the row satisfies the row filter condition to determine if the operation should be sent to the subscriber or skipped.

The update operation is handled in a slightly different manner:

  • If neither the old row nor the new one match the row filter condition: Update is skipped.
  • If the old row does not satisfy the row filter condition, but the new one does: Transform the update to insertion of new row on the subscriber.
  • If the old row satisfies the row filter condition but the new one does not: Transform the update into deletion of old row from the subscriber.
  • If both the old row and the new one satisfy the row filter condition: Send the data as an update to the subscriber, without any transformation.

Column lists

When creating a publication, a column list clause can be specified. This information is stored in pg_publication_rel catalog table:

postgres=# CREATE PUBLICATION users_filtered FOR TABLE users (user_id, firstname);
CREATE PUBLICATION
postgres=# SELECT * FROM pg_publication_rel;
  oid  | prpubid | prrelid | prqual | prattrs
-------+---------+---------+--------+---------
 16453 |   16452 |   16436 |        | 1 2
(1 row)

postgres=# SELECT * FROM pg_publication_tables;
   pubname     | schemaname | tablename |       attnames       | rowfilter
---------------+------------+-----------+----------------------+-----------
users_filtered | public     | users     | {user_id, firstname} |
(1 row)

Columns not included in this list are not sent to the subscriber. This allows the schema on the subscriber to be a subset of the publisher schema, as illustrated below.

img-dgm-logical-replication-column-lists-01

During the initial table synchronization, only columns included in the column list are copied to the subscriber.

When sending incremental transaction changes, the publisher will check for the relation information and send to the subscriber the values only for the columns that match the specified column list.

For partitioned tables, publish_via_partition_root determines whether the column list for the root or leaf relation will be used. If the parameter is 'false' (the default), the list defined for the leaf relation is used. Otherwise, the column list for the root partition will be used.

Specifying a column list when the publication also publishes FOR TABLES IN SCHEMA is not supported.
There's currently no support for subscriptions comprising several publications where the same table has been published with different column lists.

Advantages of row filters and column lists

The row filter and column list features provide the following advantages:

  • Reduces network traffic (and increases performance) by replicating only a small subset of a large data table.
  • Provides only the data that is relevant to a subscriber node.
  • Acts as a form of security by hiding sensitive information (not replicating credit card numbers).

Replicating TABLES IN SCHEMA

One or more schemas can be specified in FOR TABLES IN SCHEMA. This information is maintained in the pg_publication_namespace catalog table:

postgres=# CREATE PUBLICATION sales_publication FOR TABLES IN SCHEMA marketing, sales;
CREATE PUBLICATION
postgres=# SELECT oid, pnpubid, pnnspid::regnamespace FROM pg_publication_namespace;
  oid  | pnpubid | pnnspid
-------+---------+------------
 16450 |   16449 | marketing
 16451 |   16449 | sales
(2 rows)

During the initial table synchronization, only tables that belong to the specified schema are copied to the subscriber. When sending the incremental transaction changes, the publisher will check if this transaction’s relation belongs to one of the schemas and publish only those changes.

If the subscription has several publications, and at least one was specified using ALL TABLES, then those publications will be given higher precedence and all the table data will be sent to the subscription.

Any new table created in the schema after creation of the publication will be automatically added to it. Similarly, tables removed from the schema will be automatically removed from the publication. But data of newly created tables (after creation of subscription) will not be replicated automatically - the user will have to run ALTER SUBSCRIPTION … REFRESH PUBLICATION, which will fetch the missing tables and take care of synchronizing the data from the publisher.

ALL TABLES replication is similar to TABLES IN SCHEMA publication, except that it will replicate data from all tables, instead of just the tables present in the schema.

Further reading

If you have more questions or would like to extend your knowledge, here is some recommended reading:

img-blog-2023-pgconf-india-audience-01

Fujitsu Enterprise Postgres
leverages and extends the strength and reliability of PostgreSQL with additional enterprise features.

Compare the list of features.
We also have a series of technical articles for PostgreSQL enthusiasts of all stripes, with tips and how-to's.
Explore PostgreSQL Insider >
Subscribe to be notified of future blog posts
If you would like to be notified of my next blog posts and other PostgreSQL-related articles, fill the form here.

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK