A tool for easy, declarative management of Kafka topics. Includes the ability to "apply" topic changes from YAML as well as a repl for interactive exploration of brokers, topics, consumer groups, messages, and more.
Managing Kafka topics via the standard tooling can be tedious and error-prone; there is no standard, declarative way to define topics (e.g., YAML files that can be checked-in to git), and understanding the state of a cluster at any given point in time requires knowing and using multiple, different commands with different interfaces.
We created topicctl
to make the management of our Kafka topics more transparent and
user-friendly. The project was inspired by kubectl
and other tools that we've used in
non-Kafka-related contexts.
See this blog post for more details.
We recently revamped topicctl
to support ZooKeeper-less cluster access as well as some
additional security options (TLS/SSL and SASL)! See
this blog post to learn more about why and how we did
this.
All changes should be backwards compatible, but you'll need to update your cluster configs if you want to take advantage of these new features; see the clusters section below for more details on the latest config format.
The code for the old version has been preserved in the v0 branch if you run into problems and need to revert.
Check out the data-digger for a command-line tool that makes it easy to tail and summarize structured data in Kafka.
Either:
- Run
go install github.com/segmentio/topicctl/cmd/topicctl@latest
- Clone this repo and run
make install
in the repo root - Use the Docker image:
docker pull segment/topicctl
If you use (1) or (2), the binary will be placed in $GOPATH/bin
.
If you use Docker, you can run the tool via
docker run -ti --rm segment/topicctl [subcommand] [flags]
. Depending on your Docker setup and
what you're trying to do, you may also need to run in the host network via --net=host
and/or
mount local volumes with -v
.
- Start up a 6 node Kafka cluster locally:
docker-compose up -d
- Run the net alias script to make the broker addresses available on localhost:
./scripts/set_up_net_alias.sh
- Apply the topic configs in
examples/local-cluster/topics
:
topicctl apply --skip-confirm examples/local-cluster/topics/*yaml
- Send some test messages to the
topic-default
topic:
topicctl tester --broker-addr=localhost:9092 --topic=topic-default
- Open up the repl (while keeping the tester running in a separate terminal):
topicctl repl --cluster-config=examples/local-cluster/cluster.yaml
- Run some test commands:
get brokers
get topics
get partitions
get partitions topic-default
get offsets topic-default
tail topic-default
- Increase the number of partitions in the
topic-default
topic by changing thepartitions: ...
value in topic-default.yaml to9
and re-applying:
topicctl apply examples/local-cluster/topics/topic-default.yaml
- Bring down the local cluster:
docker-compose down
topicctl apply [path(s) to topic config(s)]
The apply
subcommand ensures that the actual state of a topic in the cluster
matches the desired state in its config. If the topic doesn't exist, the tool will
create it. If the topic already exists but its cluster state is out-of-sync,
then the tool will initiate the necessary changes to bring it into compliance.
See the Config formats section below for more information on the expected file formats.
topicctl [flags] bootstrap
The bootstrap
subcommand creates apply topic configs from the existing topics in a
cluster. This can be used to "import" topics not created or previously managed by topicctl.
The output can be sent to either a directory (if the --output
flag is set) or stdout
.
topicctl check [path(s) to topic config(s)]
The check
command validates that each topic config has the correct fields set and is
consistent with the associated cluster config. Unless --validate-only
is set, it then
checks the topic config against the state of the topic in the corresponding cluster.
topicctl create [flags] [command]
The create
command creates resources in the cluster from a configuration file.
Currently, only ACLs are supported. The create command is separate from the apply
command as it is intended for usage with immutable resources managed by topicctl.
topicctl delete [flags] [operation]
The delete
subcommand deletes a particular resource type in the cluster.
Currently, the following operations are supported:
Subcommand | Description |
---|---|
delete acls [flags] |
Deletes ACL(s) in the cluster matching the provided flags |
topicctl get [flags] [operation]
The get
subcommand lists out the instances and/or details of a particular
resource type in the cluster. Currently, the following operations are supported:
Subcommand | Description |
---|---|
get balance [optional topic] |
Number of replicas per broker position for topic or cluster as a whole |
get brokers |
All brokers in the cluster |
get config [broker or topic] |
Config key/value pairs for a broker or topic |
get groups |
All consumer groups in the cluster |
get lags [topic] [group] |
Lag for each topic partition for a consumer group |
get members [group] |
Details of each member in a consumer group |
get partitions [optional: topics] |
Get all partitions for topics |
get offsets [topic] |
Number of messages per partition along with start and end times |
get topics |
All topics in the cluster |
get acls [flags] |
Describe access control levels (ACLs) in the cluster |
get users |
All users in the cluster |
topicctl rebalance [flags]
The apply
subcommand can be used with flag --rebalance
rebalances a specified topics across a cluster.
The rebalance
subcommand, on the other hand, performs a rebalance for all the topics defined at a given topic prefix path.
See the rebalancing section below for more information on rebalancing.
topicctl repl [flags]
The repl
subcommand starts up a shell that allows running the get
and tail
subcommands interactively.
topicctl reset-offsets [topic] [group] [flags]
The reset-offsets
subcommand allows resetting the offsets for a consumer group in a topic. There are 2 main approaches for setting the offsets:
-
Use a combination of
--partitions
,--offset
,--to-earliest
and--to-latest
flags.--partitions
flag specifies a list of partitions to be reset e.g.1,2,3 ...
. If not used, the command defaults to resetting consumer group offsets for ALL of the partitions.--offset
flag indicates the specific value that all desired consumer group partitions will be set to. If not set, it will default to -2. Finally,--to-earliest
flag resets offsets of consumer group members to earliest offsets of partitions while--to-latest
resets offsets of consumer group members to latest offsets of partitions. However, only one of the--to-earliest
,--to-latest
and--offset
flags can be used at a time. This approach is easy to use but lacks the ability for detailed offset configuration. -
Use
--partition-offset-map
flag to specify a detailed offset configuration for individual partitions. For example,1=5,2=10,7=12,...
means that the consumer group offset for partition 1 must be set to 5, partition 2 to offset 10, partition 7 to offset 12 and so on. This approach provides greater flexibility and fine-grained control for this operation. Note that--partition-offset-map
flag is standalone and cannot be coupled with any of the previous flags.
topicctl tail [flags] [topic]
The tail
subcommand tails and logs out topic messages using the APIs exposed in
kafka-go. It doesn't have the full functionality
of kafkacat
(yet), but the output is prettier and it may be easier to use in some cases.
topicctl tester [flags]
The tester
command reads or writes test messages in a topic. For testing/demonstration purposes
only.
There are three ways to specify a target cluster in the topicctl
subcommands:
--cluster-config=[path]
, where the refererenced path is a cluster configuration in the format expected by theapply
command described above,--zk-addr=[zookeeper address]
and--zk-prefix=[optional prefix for cluster in zookeeper]
, or--broker-addr=[bootstrap broker address]
All subcommands support the cluster-config
pattern. The last two are also supported
by the get
, repl
, reset-offsets
, and tail
subcommands since these can be run
independently of an apply
workflow.
We've tested topicctl
on Kafka clusters with versions between 0.10.1
and 2.7.1
, inclusive.
Note, however, that clusters at versions prior to 2.4.0
cannot use broker APIs for applying and
thus also require ZooKeeper API access for full functionality. See the
cluster access details section below for more details.
If you run into any unexpected compatibility issues, please file a bug.
topicctl
uses structured, YAML-formatted configs for clusters and topics. These are
typically source-controlled so that changes can be reviewed before being applied.
Each cluster associated with a managed topic must have a config. These configs can also be used
with the get
, repl
, reset-offsets
, and tail
subcommands instead of specifying a broker or
ZooKeeper address.
The following shows an annotated example:
meta:
name: my-cluster # Name of the cluster
environment: stage # Cluster environment
region: us-west-2 # Cloud region of the cluster
shard: 1 # Shard index of this cluster, if it is sharded.
description: | # A free-text description of the cluster (optional)
Test cluster for topicctl.
spec:
bootstrapAddrs: # One or more broker bootstrap addresses
- my-cluster.example.com:9092
clusterID: abc-123-xyz # Expected cluster ID for cluster (optional,
# used as safety check only)
# ZooKeeper access settings (only required for pre-v2 clusters; leave off to force exclusive use
# of broker APIs)
zkAddrs: # One or more cluster zookeeper addresses; if these are
- zk.example.com:2181 # omitted, then the cluster will only be accessed via
# broker APIs; see the section below on cluster access for
# more details.
zkPrefix: my-cluster # Prefix for zookeeper nodes if using zookeeper access
zkLockPath: /topicctl/locks # Path used for apply locks (optional)
# TLS/SSL settings (optional, not supported if using ZooKeeper)
tls:
enabled: true # Whether TLS is enabled
caCertPath: path/to/ca.crt # Path to CA cert to be used (optional)
certPath: path/to/client.crt # Path to client cert to be used (optional)
keyPath: path/to/client.key # Path to client key to be used (optional)
# SASL settings (optional, not supported if using ZooKeeper)
sasl:
enabled: true # Whether SASL is enabled
mechanism: SCRAM-SHA-512 # Mechanism to use; choices are AWS-MSK-IAM, PLAIN,
# SCRAM-SHA-256, and SCRAM-SHA-512
username: my-username # SASL username; ignored for AWS-MSK-IAM
password: my-password # SASL password; ignored for AWS-MSK-IAM
Note that the name
, environment
, region
, and description
fields are used
for description/identification only, and don't appear in any API calls. They can
be set arbitrarily, provided that they match up with the values set in the
associated topic configs.
If the tool is run with the --expand-env
option, then the cluster config will be prepreocessed
using os.ExpandEnv
at load time. The latter will replace
references of the form $ENV_VAR_NAME
or ${ENV_VAR_NAME}
with the associated values from the
environment.
Additionally, the Amazon Resource Name (ARN) of a secret in AWS Secrets Manager can be provided instead of the username and password. Topicctl will then retrieve the secret value from Secrets Manager and use it as the credentials. The secret in Secrets Manager must have a value in the format shown below, identical to what AWS MSK requires.
{
"username": "alice",
"password": "alice-secret"
}
An example of secrets manager being used can be seen below. Be sure to include the 6Random-Characters AWS Secrets Manager tacks on to the end of a secrets ARN.
sasl:
enabled: true
mechanism: SCRAM-SHA-512
secretsManagerArn: arn:aws:secretsmanager:<Region>:<AccountId>:secret:SecretName-6RandomCharacters
Each topic is configured in a YAML file. The following is an annotated example:
meta:
name: topics-test # Name of the topic
cluster: my-cluster # Name of the cluster
environment: stage # Environment of the cluster
region: us-west-2 # Region of the cluster
description: | # Free-text description of the topic (optional)
Test topic in my-cluster.
labels: # Custom key-value pairs purposed for topic bookkeeping (optional)
key1: value1
key2: value2
spec:
partitions: 9 # Number of topic partitions
replicationFactor: 3 # Replication factor per partition
retentionMinutes: 360 # Number of minutes to retain messages (optional)
placement:
strategy: in-zone # Placement strategy, see info below
picker: randomized # Picker method, see info below (optional)
settings: # Miscellaneous other config settings (optional)
cleanup.policy: delete
max.message.bytes: 5242880
The cluster
, environment
, and region
fields are used for matching
against a cluster config and double-checking that the cluster we're applying
in is correct; they don't appear in any API calls.
See the Kafka documentation
for more details on the parameters that can be set in the settings
field. Note
that retention time can be set in either this section or via retentionMinutes
but
not in both places. The latter is easier, so it's recommended.
Multiple topics can be included in the same file, separated by ---
lines, provided
that they reference the same cluster.
The tool supports the following per-partition, replica placement strategies:
Strategy | Description |
---|---|
any |
Allow any replica placement |
balanced-leaders |
Ensure that the leaders of each partition are evenly distributed across the broker racks |
in-rack |
Ensure that the followers for each partition are in the same rack as the leader; generally this is done when the leaders are already balanced, but this isn't required |
cross-rack |
Ensure that the replicas for each partition are all in different racks; generally this is done when the leaders are already balanced, but this isn't required |
static |
Specify the placement manually, via an extra staticAssignments field. (example) |
static-in-rack |
Specify the rack placement per partition manually, via an extra staticRackAssignments field (example) |
There are often multiple options to pick from when updating a replica. For instance, with an
in-rack
strategy, we can pick any replica in the target rack that isn't already used in the
partition.
Currently, topicctl
supports the following methods for this replica "picking" process:
Method | Description |
---|---|
cluster-use |
Pick based on broker frequency in the topic, then break ties by looking at the frequency of each broker across all topics in the cluster |
lowest-index |
Pick based on broker frequency in the topic, then break ties by choosing the lowest-index broker |
randomized |
Pick based on broker frequency in the topic, then break ties randomly. The underlying random generator uses a consistent seed (generated from the topic name, partition, and index), so the choice won't vary between apply runs. |
If no picking method is set in the topic config, then randomized
is used by default.
Note that these all try to achieve in-topic balance, and only vary in the case of ties. Thus, the placements won't be significantly different in most cases.
In the future, we may add pickers that allow for some in-topic imbalance, e.g. to correct a cluster-wide broker imbalance.
If apply
is run with the --rebalance
flag, then topicctl
will rebalance specified topics
after the usual apply steps. This process will check the balance of the brokers for each index
position (i.e., first, second, third, etc.) for each partition and make replacements if there
are any brokers that are significantly over- or under-represented.
The rebalance process can optionally remove brokers from a topic. To use this feature, set the
--to-remove
flag. Note that this flag has no effect unless --rebalance
is also set.
Rebalancing is not done by default on all apply runs because it can be fairly disruptive and generally shouldn't be necessary unless the topic started off in an imbalanced state or there has been a change in the number of brokers.
To rebalance all topics in a cluster, use the rebalance
subcommand, which will perform the apply --rebalance
function on all qualifying topics. It will inventory all topic configs found at --path-prefix
for a cluster
specified by --cluster-config
.
This subcommand will not rebalance a topic if:
- the topic config is inconsistent with the cluster config (name, region, environment etc...)
- the partition count of a topic in the kafka cluster does not match the topic partition setting in the topic config
- a topic's
retention.ms
in the kafka cluster does not match the topic'sretentionMinutes
setting in the topic config - a topic does not exist in the kafka cluster
Sets of ACLs can be configured in a YAML file. The following is an annotated example:
meta:
name: acls-test # Name of the group of ACLs
cluster: my-cluster # Name of the cluster
environment: stage # Environment of the cluster
region: us-west-2 # Region of the cluster
description: | # Free-text description of the topic (optional)
Test topic in my-cluster.
labels: # Custom key-value pairs purposed for ACL bookkeeping (optional)
key1: value1
key2: value2
spec:
acls:
- resource:
type: topic # Type of resource (topic, group, cluster, etc.)
name: test-topic # Name of the resource to apply an ACL to
patternType: literal # Type of pattern (literal, prefixed, etc.)
principal: User:my-user # Principal to apply the ACL to
host: * # Host to apply the ACL to
permission: allow # Permission to apply (allow, deny)
operations: # List of operations to use for the ACLs
- read
- describe
The cluster
, environment
, and region
fields are used for matching
against a cluster config and double-checking that the cluster we're applying
in is correct; they don't appear in any API calls.
See the Kafka documentation
for more details on the parameters that can be set in the acls
field.
Multiple groups of ACLs can be included in the same file, separated by ---
lines, provided
that they reference the same cluster.
The bootstrap
, get
, repl
, and tail
subcommands are read-only and should never make
any changes in the cluster.
The apply
subcommand can make changes, but under the following conditions:
- A user confirmation is required for any mutation to the cluster
- Topics are never deleted
- Partitions can be added but are never removed
- All apply runs are interruptable and idempotent (see sections below for more details)
- Partition changes in apply runs are locked on a per-cluster basis
- Leader changes in apply runs are locked on a per-topic basis
- Partition replica migrations are protected via "throttles" to prevent the cluster network from getting overwhelmed
- Before applying, the tool checks the cluster ID against the expected value in the
cluster config. This can help prevent errors around applying in the wrong cluster when multiple
clusters are accessed through the same address, e.g
localhost:2181
.
The reset-offsets
command can also make changes in the cluster and should be used carefully.
The create
command can be used to create new resources in the cluster. It cannot be used with
mutable resources.
Apply runs are designed to be idemponent- the effects should be the same no matter how many
times they are run, assuming everything else in the cluster remains constant (e.g., the number of
brokers, each broker's rack, etc.). An exception is replica rebalance operations, which can be
non-deterministic. Changes in other topics should generally not effect idempotency, unless,
possibly, if the topic is configured to use the cluster-use
picker.
If an apply run is interrupted, then any in-progress broker migrations or leader elections will continue and any applied throttles will be kept in-place. The next time the topic is applied, the process should continue from where it left off.
topicctl
can interact with a cluster through either ZooKeeper or by hitting broker APIs
directly.
Broker APIs are used exclusively if the tool is run with either of the following flags:
--broker-addr
or--cluster-config
and the cluster config doesn't specify any ZK addresses
We recommend using this "broker only" access mode for all clusters running Kafka versions >= 2.4.
In all other cases, i.e. if --zk-addr
is specified or the cluster config has ZK addresses, then
ZooKeeper will be used for most interactions. A few operations that are not possible via ZK
will still use broker APIs, however, including:
- Group-related
get
commands:get groups
,get lags
,get members
get offsets
reset-offsets
tail
apply
with topic creation
This "mixed" mode is required for clusters running Kafka versions < 2.0.
There are a few limitations in the tool when using the broker APIs exclusively:
- Only newer versions of Kafka are supported. In particular:
- v2.0 or greater is required for read-only operations (
get brokers
,get topics
, etc.) - v2.4 or greater is required for applying topic changes
- v2.0 or greater is required for read-only operations (
- Apply locking is not yet implemented; please be careful when applying to ensure that someone else isn't applying changes in the same topic at the same time.
- The values of some dynamic broker properties, e.g.
leader.replication.throttled.rate
, are marked as "sensitive" and not returned via the API;topicctl
will show the value asSENSITIVE
. This appears to be fixed in v2.6. - Broker timestamps are not returned by the metadata API. These will be blank in the results
of
get brokers
. - Applying is not fully compatible with clusters provisioned in Confluent Cloud. It appears that Confluent prevents arbitrary partition reassignments, among other restrictions. Read-only operations seem to work.
TLS (referred to by the older name "SSL" in the Kafka documentation) is supported when running
topicctl
in the exclusive broker API mode. To use this, either set --tls-enabled
in the
command-line or, if using a cluster config, set enabled: true
in the TLS
section of
the latter.
In addition to standard TLS, the tool also supports mutual TLS using custom certs, keys, and CA certs (in PEM format). As with the enabling of TLS, these can be configured either on the command-line or in a cluster config. See this config for an example.
topicctl
supports SASL authentication when running in the exclusive broker API mode. To use this,
either set the --sasl-mechanism
and other appropriate --sasl-*
flags on the command line or
fill out the SASL
section of the cluster config.
The following mechanisms can be used:
AWS-MSK-IAM
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
If using AWS-MSK-IAM
, then topicctl
will attempt to discover your AWS credentials in the
locations and order described here.
The other mechanisms require a username and password to be set in either the cluster config
or on the command-line. See the cluster configs in the examples/auth and
examples/msk directories for some specific examples.
Note that SASL can be run either with or without TLS, although the former is generally more secure.
First, set up docker-compose and the associated network alias:
docker-compose up -d
./scripts/set_up_net_alias.sh
This will create a 6 node, 3 rack cluster locally with the brokers
accessible on 169.254.123.123
.
Then, run:
make test
You can change the Kafka version of the local cluster by setting the
KAFKA_IMAGE_TAG
environment variable when running docker-compose up -d
. See the
bitnami/kafka
dockerhub page for more
details on the available versions.
To run the get
, repl
, and tail
subcommands against the local cluster,
set --zk-addr=localhost:2181
and leave the --zk-prefix
flag unset.
To test out apply
, you can use the configs in examples/local-cluster/
. For example,
to create all topics defined for that cluster:
topicctl apply examples/local-cluster/topics/*.yaml