Create and Configure Changefeeds

On this page Carat arrow pointing down

Core and Enterprise changefeeds offer different levels of configurability. Enterprise changefeeds allow for active changefeed jobs to be paused, resumed, and canceled.

This page describes:

Before you create a changefeed

  1. Enable rangefeeds on CockroachDB Advanced and CockroachDB Self-Hosted. Refer to Enable rangefeeds for instructions.
  2. Decide on whether you will run an Enterprise or Core changefeed. Refer to the Overview page for a comparative capability table.
  3. Plan the number of changefeeds versus the number of tables to include in a single changefeed for your cluster. We recommend limiting the number of changefeeds per cluster to 80. Refer to System resources and running changefeeds and Recommendations for the number of target tables.
  4. Consider whether your Enterprise changefeed use case would be better served by change data capture queries that can filter data on a single table. CDC queries can improve the efficiency of changefeeds because the job will not need to encode as much change data.
  5. Read the Considerations section that provides information on changefeed interactions that could affect how you configure or run your changefeed.

Enable rangefeeds

Changefeeds connect to a long-lived request called a rangefeed, which pushes changes as they happen. This reduces the latency of row changes, as well as reduces transaction restarts on tables being watched by a changefeed for some workloads.

Rangefeeds must be enabled for a changefeed to work. To enable the cluster setting:

icon/buttons/copy
SET CLUSTER SETTING kv.rangefeed.enabled = true;

Any created changefeeds will error until this setting is enabled. If you are working on a CockroachDB Serverless cluster, the kv.rangefeed.enabled cluster setting is enabled by default.

Enabling rangefeeds has a small performance cost (about a 5–10% increase in write latencies), whether or not the rangefeed is being used in a changefeed. When kv.rangefeed.enabled is set to true, a small portion of the latency cost is caused by additional write event information that is sent to the Raft log and for replication. The remainder of the latency cost is incurred once a changefeed is running; the write event information is reconstructed and sent to an active rangefeed, which will push the event to the changefeed.

The kv.closed_timestamp.target_duration cluster setting can be used with changefeeds. Resolved timestamps will always be behind by at least the duration configured by this setting. However, decreasing the duration leads to more transaction restarts in your cluster, which can affect performance.

Recommendations for the number of target tables

When creating a changefeed, it's important to consider the number of changefeeds versus the number of tables to include in a single changefeed:

  • Changefeeds each have their own memory overhead, so every running changefeed will increase total memory usage.
  • Creating a single changefeed that will watch hundreds of tables can affect the performance of a changefeed by introducing coupling, where the performance of a target table affects the performance of the changefeed watching it. For example, any schema change on any of the tables will affect the entire changefeed's performance.

To watch multiple tables, we recommend creating a changefeed with a comma-separated list of tables. However, we do not recommend creating a single changefeed for watching hundreds of tables.

Cockroach Labs recommends monitoring your changefeeds to track retryable errors and protected timestamp usage. Refer to the Monitor and Debug Changefeeds page for more information.

System resources and running changefeeds

When you are running more than 10 changefeeds on a cluster, it is important to monitor the CPU usage. A larger cluster will be able to run more changefeeds concurrently compared to a smaller cluster with more limited resources.

Note:

We recommend limiting the number of changefeeds per cluster to 80.

To maintain a high number of changefeeds in your cluster:

  • Connect to different nodes to create each changefeed. The node on which you start the changefeed will become the coordinator node for the changefeed job. The coordinator node acts as an administrator: keeping track of all other nodes during job execution and the changefeed work as it completes. As a result, this node will use more resources for the changefeed job. Refer to How does an Enterprise changefeed work? for more detail.
  • Consider logically grouping the target tables into one changefeed. When a changefeed pauses, it will stop emitting messages for the target tables. Grouping tables of related data into a single changefeed may make sense for your workload. However, we do not recommend watching hundreds of tables in a single changefeed. Refer to Garbage collection and changefeeds for more detail on protecting data from garbage collection when a changefeed is paused.

Considerations

  • Pause running changefeed jobs before you start a rolling upgrade process to move to a later version of CockroachDB. For more details, refer to the Upgrade CockroachDB version page.
  • If you require resolved message frequency under 30s, then you must set the min_checkpoint_frequency option to at least the desired resolved frequency.
  • Many DDL queries (including TRUNCATE, DROP TABLE, and queries that add a column family) will cause errors on a changefeed watching the affected tables. You will need to start a new changefeed. If a table is truncated that a changefeed with on_error='pause' is watching, you will also need to start a new changefeed. See change data capture Known Limitations for more detail.
  • Partial or intermittent sink unavailability may impact changefeed stability. If a sink is unavailable, messages can't send, which means that a changefeed's high-water mark timestamp is at risk of falling behind the cluster's garbage collection window. Throughput and latency can be affected once the sink is available again. However, ordering guarantees will still hold for as long as a changefeed remains active.
  • When an IMPORT INTO statement is run, any current changefeed jobs targeting that table will fail.
  • After you restore from a full-cluster backup, changefeed jobs will not resume on the new cluster. It is necessary to manually create the changefeeds following the full-cluster restore.
  • As of v22.1, changefeeds filter out VIRTUAL computed columns from events by default. This is a backward-incompatible change. To maintain the changefeed behavior in previous versions where NULL values are emitted for virtual computed columns, see the virtual_columns option for more detail.

The following Enterprise and Core sections outline how to create and configure each type of changefeed:

Configure a changefeed

An Enterprise changefeed streams row-level changes in a configurable format to a configurable sink (i.e., Kafka or a cloud storage sink). You can create, pause, resume, and cancel an Enterprise changefeed. For a step-by-step example connecting to a specific sink, see the Changefeed Examples page.

Create

To create an Enterprise changefeed, you can either:

Note:

Parameters should always be URI-encoded before they are included in the changefeed's URI, as they often contain special characters. Use Javascript's encodeURIComponent function or Go language's url.QueryEscape function to URI-encode the parameters. Other languages provide similar functions to URI-encode special characters.

Run CREATE CHANGEFEED

icon/buttons/copy
CREATE CHANGEFEED FOR TABLE table_name, table_name2 INTO '{scheme}://{host}:{port}?{query_parameters}' [WITH options];

We recommend using this changefeed when:

  • All changes to the table data are required with no filtering.
  • The CDC queries limitations are a blocker to how you would process data.

Use change data capture queries

icon/buttons/copy
CREATE CHANGEFEED [INTO sink] [WITH options] AS SELECT projection FROM table [WHERE predicate];

We recommend using change data capture queries when you need to:

  • Filter data to remove unnecessary messages.
  • Apply transformations to messages before sending to a sink.
  • Minimize any potential impact to the cluster from a changefeed job. For more detail on this, refer to How does an Enterprise Changefeeds Work?.

Sinkless changefeeds

When you create a changefeed without specifying a sink, CockroachDB sends the changefeed events to the SQL client:

icon/buttons/copy
CREATE CHANGEFEED FOR TABLE table_name [WITH options];

Consider the following regarding the display format in your SQL client:

  • If you do not define a display format, the CockroachDB SQL client will automatically use ndjson format.
  • If you specify a display format, the client will use that format (e.g., --format=csv).
  • If you set the client display format to ndjson and set the changefeed format to csv, you'll receive JSON format with CSV nested inside.
  • If you set the client display format to csv and set the changefeed format to json, you'll receive a comma-separated list of JSON values.

For more information, see CREATE CHANGEFEED.

Show

To show a list of Enterprise changefeed jobs:

icon/buttons/copy
SHOW CHANGEFEED JOBS;
    job_id             |                                                                                   description                                                                  | ...
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+ ...
  685724608744325121   | CREATE CHANGEFEED FOR TABLE mytable INTO 'kafka://localhost:9092' WITH confluent_schema_registry = 'http://localhost:8081', format = 'avro', resolved, updated | ...
  685723987509116929   | CREATE CHANGEFEED FOR TABLE mytable INTO 'kafka://localhost:9092' WITH confluent_schema_registry = 'http://localhost:8081', format = 'avro', resolved, updated | ...
(2 rows)

To show an individual Enterprise changefeed:

icon/buttons/copy
SHOW CHANGEFEED JOB {job_id};
        job_id       |                                     description                                      | user_name | status  |              running_status              |          created           |          started           | finished |          modified          |      high_water_timestamp      | error |    sink_uri    |  full_table_names   | topics | format
---------------------+--------------------------------------------------------------------------------------+-----------+---------+------------------------------------------+----------------------------+----------------------------+----------+----------------------------+--------------------------------+-------+----------------+---------------------+--------+----------
  866218332400680961 | CREATE CHANGEFEED FOR TABLE movr.users INTO 'external://aws' WITH format = 'parquet' | root      | running | running: resolved=1684438482.937939878,0 | 2023-05-18 14:14:16.323465 | 2023-05-18 14:14:16.360245 | NULL     | 2023-05-18 19:35:16.120407 | 1684438482937939878.0000000000 |       | external://aws | {movr.public.users} | NULL   | parquet
(1 row)

New in v23.1: All changefeed jobs will display regardless of if the job completed and when it completed. You can define a retention time and delete completed jobs by using the jobs.retention_time cluster setting.

For more information, refer to SHOW CHANGEFEED JOB.

Pause

To pause an Enterprise changefeed:

icon/buttons/copy
PAUSE JOB job_id;

For more information, refer to PAUSE JOB.

Resume

To resume a paused Enterprise changefeed:

icon/buttons/copy
RESUME JOB job_id;

For more information, refer to RESUME JOB.

Cancel

To cancel an Enterprise changefeed:

icon/buttons/copy
CANCEL JOB job_id;

For more information, refer to CANCEL JOB.

Modify a changefeed

To modify an Enterprise changefeed, pause the job and then use:

ALTER CHANGEFEED job_id [ADD table] [DROP table] [SET option] [UNSET option];

You can add new table targets, remove them, set new changefeed options, and unset them.

For more information, see ALTER CHANGEFEED.

Configuring all changefeeds

It is useful to be able to pause all running changefeeds during troubleshooting, testing, or when a decrease in CPU load is needed.

To pause all running changefeeds:

icon/buttons/copy
PAUSE JOBS (WITH x AS (SHOW CHANGEFEED JOBS) SELECT job_id FROM x WHERE status = ('running'));

This will change the status for each of the running changefeeds to paused, which can be verified with SHOW CHANGEFEED JOBS.

To resume all running changefeeds:

icon/buttons/copy
RESUME JOBS (WITH x AS (SHOW CHANGEFEED JOBS) SELECT job_id FROM x WHERE status = ('paused'));

This will resume the changefeeds and update the status for each of the changefeeds to running.

Create a changefeed

A core changefeed streams row-level changes to the client indefinitely until the underlying connection is closed or the changefeed is canceled.

To create a core changefeed:

icon/buttons/copy
EXPERIMENTAL CHANGEFEED FOR table_name;

For more information, see EXPERIMENTAL CHANGEFEED FOR.

Known limitations

See also


Yes No
On this page

Yes No