Citus Tutorial
In this guide we’ll create a Citus cluster with a coordinator node and three workers. Every node will have a secondary for failover. We’ll simulate failure in the coordinator and worker nodes and see how the system continues to function.
This tutorial uses docker compose in order to separate the architecture design from some of the implementation details. This allows reasoning at the architecture level within this tutorial, and better see which software component needs to be deployed and run on which node.
The setup provided in this tutorial is good for replaying at home in the lab. It is not intended to be production ready though. In particular, no attention have been spent on volume management. After all, this is a tutorial: the goal is to walk through the first steps of using pg_auto_failover to provide HA to a Citus formation.
Pre-requisites
When using docker compose we describe a list of services, each service may run on one or more nodes, and each service just runs a single isolated process in a container.
Within the context of a tutorial, or even a development environment, this matches very well to provisioning separate physical machines on-prem, or Virtual Machines either on-prem on in a Cloud service.
The docker image used in this tutorial is named pg_auto_failover:citus. It
can be built locally when using the attached Dockerfile found within the GitHub repository for pg_auto_failover.
To build the image, either use the provided Makefile and run make build,
or run the docker build command directly:
$ git clone https://github.com/citusdata/pg_auto_failover
$ cd pg_auto_failover/docs/cluster
$ docker build -t pg_auto_failover:citus -f Dockerfile ../..
$ docker compose build
Our first Citus Cluster
To create a cluster we use the following docker compose definition:
1version: "3.9" # optional since v1.27.0
2
3services:
4
5 monitor:
6 image: pg_auto_failover:citus
7 environment:
8 PGDATA: /tmp/pgaf
9 command: |
10 pg_autoctl create monitor --ssl-self-signed --auth trust --run
11 expose:
12 - 5432
13
14 coord:
15 image: pg_auto_failover:citus
16 environment:
17 PGDATA: /tmp/pgaf
18 PGUSER: citus
19 PGDATABASE: citus
20 PG_AUTOCTL_MONITOR: "postgresql://autoctl_node@monitor/pg_auto_failover"
21 expose:
22 - 5432
23 command: |
24 pg_autoctl create coordinator --ssl-self-signed --auth trust --pg-hba-lan --run
25
26 worker:
27 image: pg_auto_failover:citus
28 environment:
29 PGDATA: /tmp/pgaf
30 PGUSER: citus
31 PGDATABASE: citus
32 PG_AUTOCTL_MONITOR: "postgresql://autoctl_node@monitor/pg_auto_failover"
33 expose:
34 - 5432
35 command: |
36 pg_autoctl create worker --ssl-self-signed --auth trust --pg-hba-lan --run
To run the full Citus cluster with HA from this definition, we can use the following command:
$ docker compose up --scale coord=2 --scale worker=6
The command above starts the services up. The command also specifies a
--scale option that is different for each service. We need:
one monitor node, and the default scale for a service is 1,
one primary Citus coordinator node and one secondary Cituscoordinator node, which is to say two coordinator nodes,
and three Citus worker nodes, each worker with both a primary Postgres node and a secondary Postgres node, so that's a scale of 6 here.
The default policy for the pg_auto_failover monitor is to assign a primary and a secondary per auto failover Group. In our case, every node being provisioned with the same command, we benefit from that default policy:
$ pg_autoctl create worker --ssl-self-signed --auth trust --pg-hba-lan --run
When provisioning a production cluster, it is often required to have a
better control over which node participates in which group, then using the
--group N option in the pg_autoctl create worker command line.
Within a given group, the first node that registers is a primary, and the other nodes are secondary nodes. The monitor takes care of that in a way that we don't have to. In a High Availability setup, every node should be ready to be promoted primary at any time, so knowing which node in a group is assigned primary first is not very interesting.
While the cluster is being provisionned by docker compose, you can run the
following command and have a dynamic dashboard to follow what's happening.
The following command is like top for pg_auto_failover:
$ docker compose exec monitor pg_autoctl watch
Because the pg_basebackup operation that is used to create the secondary
nodes takes some time when using Citus, because of the first CHECKPOINT
which is quite slow. So at first when inquiring about the cluster state you
might see the following output:
$ docker compose exec monitor pg_autoctl show state
Name | Node | Host:Port | TLI: LSN | Connection | Reported State | Assigned State
---------+-------+-------------------+----------------+--------------+---------------------+--------------------
coord0a | 0/1 | cd52db444544:5432 | 1: 0/200C4A0 | read-write | wait_primary | wait_primary
coord0b | 0/2 | 66a31034f2e4:5432 | 1: 0/0 | none ! | wait_standby | catchingup
worker1a | 1/3 | dae7c062e2c1:5432 | 1: 0/2003B18 | read-write | wait_primary | wait_primary
worker1b | 1/4 | 397e6069b09b:5432 | 1: 0/0 | none ! | wait_standby | catchingup
worker2a | 2/5 | 5bf86f9ef784:5432 | 1: 0/2006AB0 | read-write | wait_primary | wait_primary
worker2b | 2/6 | 23498b801a61:5432 | 1: 0/0 | none ! | wait_standby | catchingup
worker3a | 3/7 | c23610380024:5432 | 1: 0/2003B18 | read-write | wait_primary | wait_primary
worker3b | 3/8 | 2ea8aac8a04a:5432 | 1: 0/0 | none ! | wait_standby | catchingup
After a while though (typically around a minute or less), you can run that same command again for stable result:
$ docker compose exec monitor pg_autoctl show state
Name | Node | Host:Port | TLI: LSN | Connection | Reported State | Assigned State
---------+-------+-------------------+----------------+--------------+---------------------+--------------------
coord0a | 0/1 | cd52db444544:5432 | 1: 0/3127AD0 | read-write | primary | primary
coord0b | 0/2 | 66a31034f2e4:5432 | 1: 0/3127AD0 | read-only | secondary | secondary
worker1a | 1/3 | dae7c062e2c1:5432 | 1: 0/311B610 | read-write | primary | primary
worker1b | 1/4 | 397e6069b09b:5432 | 1: 0/311B610 | read-only | secondary | secondary
worker2a | 2/5 | 5bf86f9ef784:5432 | 1: 0/311B610 | read-write | primary | primary
worker2b | 2/6 | 23498b801a61:5432 | 1: 0/311B610 | read-only | secondary | secondary
worker3a | 3/7 | c23610380024:5432 | 1: 0/311B648 | read-write | primary | primary
worker3b | 3/8 | 2ea8aac8a04a:5432 | 1: 0/311B648 | read-only | secondary | secondary
You can see from the above that the coordinator node has a primary and secondary instance for high availability. When connecting to the coordinator, clients should try connecting to whichever instance is running and supports reads and writes.
We can review the available Postgres URIs with the pg_autoctl show uri command:
$ docker compose exec monitor pg_autoctl show uri
Type | Name | Connection String
-------------+---------+-------------------------------
monitor | monitor | postgres://autoctl_node@552dd89d5d63:5432/pg_auto_failover?sslmode=require
formation | default | postgres://66a31034f2e4:5432,cd52db444544:5432/citus?target_session_attrs=read-write&sslmode=require
To check that Citus worker nodes have been registered to the coordinator, we can run a psql session right from the coordinator container:
$ docker compose exec coord psql -d citus -c 'select * from citus_get_active_worker_nodes();'
node_name | node_port
--------------+-----------
dae7c062e2c1 | 5432
5bf86f9ef784 | 5432
c23610380024 | 5432
(3 rows)
We are now reaching the limits of using a simplified docker compose setup.
When using the --scale option, it is not possible to give a specific
hostname to each running node, and then we get a randomly generated string
instead or useful node names such as worker1a or worker3b.
Create a Citus Cluster, take two
In order to implement the following architecture, we need to introduce a more complex docker compose file than in the previous section.
pg_auto_failover architecture with a Citus formation
This time we create a cluster using the following docker compose definition:
1x-coord: &coordinator
2 image: pg_auto_failover:citus
3 environment:
4 PGDATA: /tmp/pgaf
5 PGUSER: citus
6 PGDATABASE: citus
7 PG_AUTOCTL_HBA_LAN: true
8 PG_AUTOCTL_AUTH_METHOD: "trust"
9 PG_AUTOCTL_SSL_SELF_SIGNED: true
10 PG_AUTOCTL_MONITOR: "postgresql://autoctl_node@monitor/pg_auto_failover"
11 expose:
12 - 5432
13
14x-worker: &worker
15 image: pg_auto_failover:citus
16 environment:
17 PGDATA: /tmp/pgaf
18 PGUSER: citus
19 PGDATABASE: citus
20 PG_AUTOCTL_HBA_LAN: true
21 PG_AUTOCTL_AUTH_METHOD: "trust"
22 PG_AUTOCTL_SSL_SELF_SIGNED: true
23 PG_AUTOCTL_MONITOR: "postgresql://autoctl_node@monitor/pg_auto_failover"
24 expose:
25 - 5432
26
27services:
28 app:
29 build:
30 context: .
31 dockerfile: Dockerfile.app
32 environment:
33 PGUSER: citus
34 PGDATABASE: citus
35 PGHOST: coord0a,coord0b
36 PGPORT: 5432
37 PGAPPNAME: demo
38 PGSSLMODE: require
39 PGTARGETSESSIONATTRS: read-write
40
41 monitor:
42 image: pg_auto_failover:citus
43 environment:
44 PGDATA: /tmp/pgaf
45 PG_AUTOCTL_SSL_SELF_SIGNED: true
46 expose:
47 - 5432
48 command: |
49 pg_autoctl create monitor --auth trust --run
50
51 coord0a:
52 <<: *coordinator
53 hostname: coord0a
54 command: |
55 pg_autoctl create coordinator --name coord0a --run
56
57 coord0b:
58 <<: *coordinator
59 hostname: coord0b
60 command: |
61 pg_autoctl create coordinator --name coord0b --run
62
63 worker1a:
64 <<: *worker
65 hostname: worker1a
66 command: |
67 pg_autoctl create worker --group 1 --name worker1a --run
68
69 worker1b:
70 <<: *worker
71 hostname: worker1b
72 command: |
73 pg_autoctl create worker --group 1 --name worker1b --run
74
75 worker2a:
76 <<: *worker
77 hostname: worker2a
78 command: |
79 pg_autoctl create worker --group 2 --name worker2a --run
80
81 worker2b:
82 <<: *worker
83 hostname: worker2b
84 command: |
85 pg_autoctl create worker --group 2 --name worker2b --run
86
87 worker3a:
88 <<: *worker
89 hostname: worker3a
90 command: |
91 pg_autoctl create worker --group 3 --name worker3a --run
92
93 worker3b:
94 <<: *worker
95 hostname: worker3b
96 command: |
97 pg_autoctl create worker --group 3 --name worker3b --run
This definition is a little more involved than the previous one. We take benefit from YAML anchors and aliases to define a template for our coordinator nodes and worker nodes, and then apply that template to the actual nodes.
Also this time we provision an application service (named "app") that sits
in the background and allow us to later connect to our current primary
coordinator. See Dockerfile.app for the
complete definition of this service.
We start this cluster with a simplified command line this time:
$ docker compose up
And this time we get the following cluster as a result:
$ docker compose exec monitor pg_autoctl show state
Name | Node | Host:Port | TLI: LSN | Connection | Reported State | Assigned State
---------+-------+---------------+----------------+--------------+---------------------+--------------------
coord0a | 0/3 | coord0a:5432 | 1: 0/312B040 | read-write | primary | primary
coord0b | 0/4 | coord0b:5432 | 1: 0/312B040 | read-only | secondary | secondary
worker1a | 1/1 | worker1a:5432 | 1: 0/311C550 | read-write | primary | primary
worker1b | 1/2 | worker1b:5432 | 1: 0/311C550 | read-only | secondary | secondary
worker2b | 2/7 | worker2b:5432 | 2: 0/5032698 | read-write | primary | primary
worker2a | 2/8 | worker2a:5432 | 2: 0/5032698 | read-only | secondary | secondary
worker3a | 3/5 | worker3a:5432 | 1: 0/311C870 | read-write | primary | primary
worker3b | 3/6 | worker3b:5432 | 1: 0/311C870 | read-only | secondary | secondary
And then we have the following application connection string to use:
$ docker compose exec monitor pg_autoctl show uri
Type | Name | Connection String
-------------+---------+-------------------------------
monitor | monitor | postgres://autoctl_node@f0135b83edcd:5432/pg_auto_failover?sslmode=require
formation | default | postgres://coord0b:5432,coord0a:5432/citus?target_session_attrs=read-write&sslmode=require
And finally, the nodes being registered as Citus worker nodes also make more sense:
$ docker compose exec coord0a psql -d citus -c 'select * from citus_get_active_worker_nodes()'
node_name | node_port
-----------+-----------
worker1a | 5432
worker3a | 5432
worker2b | 5432
(3 rows)
Important
At this point, it is important to note that the Citus coordinator only
knows about the primary nodes in each group. The High Availability
mechanisms are all implemented in pg_auto_failover, which mostly uses the
Citus API citus_update_node during worker node failovers.
Our first Citus worker failover
We see that in the citus_get_active_worker_nodes() output we have
worker1a, worker2b, and worker3a. As mentioned before, that
should have no impact on the operations of the Citus cluster when nodes are
all dimensionned the same.
That said, some readers among you will prefer to have the A nodes as primaries to get started with. So let's implement our first worker failover then. With pg_auto_failover, this is as easy as doing:
$ docker compose exec monitor pg_autoctl perform failover --group 2
15:40:03 9246 INFO Waiting 60 secs for a notification with state "primary" in formation "default" and group 2
15:40:03 9246 INFO Listening monitor notifications about state changes in formation "default" and group 2
15:40:03 9246 INFO Following table displays times when notifications are received
Time | Name | Node | Host:Port | Current State | Assigned State
---------+----------+-------+---------------+---------------------+--------------------
22:58:42 | worker2b | 2/7 | worker2b:5432 | primary | draining
22:58:42 | worker2a | 2/8 | worker2a:5432 | secondary | prepare_promotion
22:58:42 | worker2a | 2/8 | worker2a:5432 | prepare_promotion | prepare_promotion
22:58:42 | worker2a | 2/8 | worker2a:5432 | prepare_promotion | wait_primary
22:58:42 | worker2b | 2/7 | worker2b:5432 | primary | demoted
22:58:42 | worker2b | 2/7 | worker2b:5432 | draining | demoted
22:58:42 | worker2b | 2/7 | worker2b:5432 | demoted | demoted
22:58:43 | worker2a | 2/8 | worker2a:5432 | wait_primary | wait_primary
22:58:44 | worker2b | 2/7 | worker2b:5432 | demoted | catchingup
22:58:46 | worker2b | 2/7 | worker2b:5432 | catchingup | catchingup
22:58:46 | worker2b | 2/7 | worker2b:5432 | catchingup | secondary
22:58:46 | worker2b | 2/7 | worker2b:5432 | secondary | secondary
22:58:46 | worker2a | 2/8 | worker2a:5432 | wait_primary | primary
22:58:46 | worker2a | 2/8 | worker2a:5432 | primary | primary
So it took around 5 seconds to do a full worker failover in worker group 2. Now we'll do the same on the group 1 to fix the other situation, and review the resulting cluster state.
$ docker compose exec monitor pg_autoctl show state
Name | Node | Host:Port | TLI: LSN | Connection | Reported State | Assigned State
---------+-------+---------------+----------------+--------------+---------------------+--------------------
coord0a | 0/3 | coord0a:5432 | 1: 0/312ADA8 | read-write | primary | primary
coord0b | 0/4 | coord0b:5432 | 1: 0/312ADA8 | read-only | secondary | secondary
worker1a | 1/1 | worker1a:5432 | 1: 0/311B610 | read-write | primary | primary
worker1b | 1/2 | worker1b:5432 | 1: 0/311B610 | read-only | secondary | secondary
worker2b | 2/7 | worker2b:5432 | 2: 0/50000D8 | read-only | secondary | secondary
worker2a | 2/8 | worker2a:5432 | 2: 0/50000D8 | read-write | primary | primary
worker3a | 3/5 | worker3a:5432 | 1: 0/311B648 | read-write | primary | primary
worker3b | 3/6 | worker3b:5432 | 1: 0/311B648 | read-only | secondary | secondary
Which seen from the Citus coordinator, looks like the following:
$ docker compose exec coord0a psql -d citus -c 'select * from citus_get_active_worker_nodes()'
node_name | node_port
-----------+-----------
worker1a | 5432
worker3a | 5432
worker2a | 5432
(3 rows)
Distribute Data to Workers
Let's create a database schema with a single distributed table.
$ docker compose exec app psql
-- in psql
CREATE TABLE companies
(
id bigserial PRIMARY KEY,
name text NOT NULL,
image_url text,
created_at timestamp without time zone NOT NULL,
updated_at timestamp without time zone NOT NULL
);
SELECT create_distributed_table('companies', 'id');
Next download and ingest some sample data, still from within our psql session:
\copy companies from program 'curl -o- https://examples.citusdata.com/mt_ref_arch/companies.csv' with csv
# ( COPY 75 )
Handle Worker Failure
Now we'll intentionally crash a worker's primary node and observe how the pg_auto_failover monitor unregisters that node in the coordinator and registers the secondary instead.
# the pg_auto_failover keeper process will be unable to resurrect
# the worker node if pg_control has been removed
$ docker compose exec worker1a rm /tmp/pgaf/global/pg_control
# shut it down
$ docker compose exec worker1a /usr/lib/postgresql/14/bin/pg_ctl stop -D /tmp/pgaf
The keeper will attempt to start worker 1a three times and then report the failure to the monitor, who promotes worker1b to replace worker1a. Citus worker worker1a is unregistered with the coordinator node, and worker1b is registered in its stead.
Asking the coordinator for active worker nodes now shows worker1b, worker2a, and worker3a:
$ docker compose exec app psql -c 'select * from master_get_active_worker_nodes();'
node_name | node_port
-----------+-----------
worker3a | 5432
worker2a | 5432
worker1b | 5432
(3 rows)
Finally, verify that all rows of data are still present:
$ docker compose exec app psql -c 'select count(*) from companies;'
count
-------
75
Meanwhile, the keeper on worker 1a heals the node. It runs pg_basebackup
to fetch the current PGDATA from worker1a. This restores, among other
things, a new copy of the file we removed. After streaming replication
completes, worker1b becomes a full-fledged primary and worker1a its
secondary.
$ docker compose exec monitor pg_autoctl show state
Name | Node | Host:Port | TLI: LSN | Connection | Reported State | Assigned State
---------+-------+---------------+----------------+--------------+---------------------+--------------------
coord0a | 0/3 | coord0a:5432 | 1: 0/3178B20 | read-write | primary | primary
coord0b | 0/4 | coord0b:5432 | 1: 0/3178B20 | read-only | secondary | secondary
worker1a | 1/1 | worker1a:5432 | 2: 0/504C400 | read-only | secondary | secondary
worker1b | 1/2 | worker1b:5432 | 2: 0/504C400 | read-write | primary | primary
worker2b | 2/7 | worker2b:5432 | 2: 0/50FF048 | read-only | secondary | secondary
worker2a | 2/8 | worker2a:5432 | 2: 0/50FF048 | read-write | primary | primary
worker3a | 3/5 | worker3a:5432 | 1: 0/31CD8C0 | read-write | primary | primary
worker3b | 3/6 | worker3b:5432 | 1: 0/31CD8C0 | read-only | secondary | secondary
Handle Coordinator Failure
Because our application connection string includes both coordinator hosts
with the option target_session_attrs=read-write, the database client
will connect to whichever of these servers supports both reads and writes.
However if we use the same trick with the pg_control file to crash our primary coordinator, we can watch how the monitor promotes the secondary.
$ docker compose exec coord0a rm /tmp/pgaf/global/pg_control
$ docker compose exec coord0a /usr/lib/postgresql/14/bin/pg_ctl stop -D /tmp/pgaf
After some time, coordinator A's keeper heals it, and the cluster converges in this state:
$ docker compose exec monitor pg_autoctl show state
Name | Node | Host:Port | TLI: LSN | Connection | Reported State | Assigned State
---------+-------+---------------+----------------+--------------+---------------------+--------------------
coord0a | 0/3 | coord0a:5432 | 2: 0/50000D8 | read-only | secondary | secondary
coord0b | 0/4 | coord0b:5432 | 2: 0/50000D8 | read-write | primary | primary
worker1a | 1/1 | worker1a:5432 | 2: 0/504C520 | read-only | secondary | secondary
worker1b | 1/2 | worker1b:5432 | 2: 0/504C520 | read-write | primary | primary
worker2b | 2/7 | worker2b:5432 | 2: 0/50FF130 | read-only | secondary | secondary
worker2a | 2/8 | worker2a:5432 | 2: 0/50FF130 | read-write | primary | primary
worker3a | 3/5 | worker3a:5432 | 1: 0/31CD8C0 | read-write | primary | primary
worker3b | 3/6 | worker3b:5432 | 1: 0/31CD8C0 | read-only | secondary | secondary
We can check that the data is still available through the new coordinator node too:
$ docker compose exec app psql -c 'select count(*) from companies;'
count
-------
75
Cleanup
To dispose of the entire tutorial environment, just use the following command:
$ docker compose down
Next steps
As mentioned in the first section of this tutorial, the way we use docker compose here is not meant to be production ready. It's useful to understand and play with a distributed system such as Citus though, and makes it simple to introduce faults and see how the pg_auto_failover High Availability reacts to those faults.
One obvious missing element to better test the system is the lack of persistent volumes in our docker compose based test rig. It is possible to create external volumes and use them for each node in the docker compose definition. This allows restarting nodes over the same data set.
See the command pg_autoctl do tmux compose session for more details about how to run a docker compose test environment with docker compose, including external volumes for each node.
Now is a good time to go read Citus Documentation too, so that you know how to use this cluster you just created!