CockroachDB Changefeeds: mapping primary key to Kafka partition

In the Documentation for CockroachDB changefeeds (CDC), we read that

Rows are sharded between Kafka partitions by the row’s primary key. This post documents a simple experiment to verify this claim.

This comment from the Go Kafka client used by CockroachDB illustrates how this assignment from primary key to partition ID is accomplished:

// NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a
// random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used,
// modulus the number of partitions. This ensures that messages with the same key always end up on the
// same partition.
func NewHashPartitioner(topic string) Partitioner {
    p := new(hashPartitioner)
    p.random = NewRandomPartitioner(topic)
    p.hasher = fnv.New32a()
    p.referenceAbs = false
    return p
}

Here is the DDL for the table:

CREATE TABLE tourist_locations
    (
      name TEXT
      , lat FLOAT8
      , lon FLOAT8
      , enabled BOOLEAN DEFAULT TRUE
      , geohash CHAR(9) AS (ST_GEOHASH(ST_SETSRID(ST_MAKEPOINT(lon, lat), 4326), 9)) STORED
      , CONSTRAINT "primary" PRIMARY KEY (geohash ASC)
);

The plan

  1. Create a Kafka topic with 3 partitions
  2. Create a CockroachDB enterprise changefeed on a table, into this Kafka topic
  3. Start 3 Kafka consumers, one per partition, all listening on this topic
  4. Run an UPDATE statement affecting all rows of the table
  5. Observe the output from the consumers
  6. Run a loop, updating the table multiple times
  7. Observe that a given row exists solely within the output from a given partition

The detailed steps

  1. $ ./kafka_2.13-3.2.3/bin/kafka-topics.sh --create --topic tourist_locations --replication-factor 1   --partitions 3 --bootstrap-server localhost:9092
    
  2. (Using a psql CLI): CREATE CHANGEFEED FOR TABLE tourist_locations INTO 'kafka://localhost:9092';
  3. $ for p in 0 1 2 ; do ( ./kafka_2.13-3.2.3/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition $p --topic tourist_locations > p${p}.log & ) ; done
  4. (Using psql CLI): UPDATE tourist_locations SET enabled = NOT enabled;
  5. (Just looking at p1.log)$ cat p1.log {"after": {"enabled": false, "geohash": "gc7x3p771", "lat": 53.346028, "lon": -6.279658, "name": "Dublin"}} {"after": {"enabled": false, "geohash": "gcpvj0e27", "lat": 51.506712, "lon": -0.127235, "name": "Trafalgar Square"}} {"after": {"enabled": false, "geohash": "u09tvwkbg", "lat": 48.857744, "lon": 2.357768, "name": "Le Marais"}} {"after": {"enabled": false, "geohash": "u281z6trn", "lat": 48.135056, "lon": 11.576097, "name": "Munich"}}
  6. for i in {1..100} ; do psql postgres://root@localhost:26257/defaultdb -c "UPDATE tourist_locations SET enabled = NOT enabled" ; done
  7. (Look for that "Munich" row): $ grep -c Munich p?.log p0.log:0 p1.log:101 p2.log:0

The Results

The row for "Munich", which has a primary key of u281z6trn, only appeared in the output from the Kafka client for partition 1. That is what the documentation stated, so our observations are consistent with the documentation, at least for this case of 100 UPDATEs.

Reference

As always, thanks for reading!