You are browsing a read-only backup copy of Wikitech. The live site can be found at wikitech.wikimedia.org
Kafka/Administration: Difference between revisions
imported>Ottomata m (→Kafka ACLs) |
imported>Ottomata m (→Verify the e of a topic: fix section) |
||
(17 intermediate revisions by 6 users not shown) | |||
Line 29: | Line 29: | ||
Once you are ready to start the broker back up, you can do so with a simple <tt> service kafka start</tt>. | Once you are ready to start the broker back up, you can do so with a simple <tt> service kafka start</tt>. | ||
It will likely take a few minutes for the broker to recover after restarting. It needs to check all of its recent logs to make sure that it doesn't have any incomplete messages. It will also start replicating partitions from where it left off when it was restarted. Keep checking <tt>kafka topics --describe</tt> until all topic-partitions have all brokers in the isr. Once the topic-partitions are up to date on all brokers, you can start a replica election to balance the leaders across brokers. NOTE: replica election | It will likely take a few minutes for the broker to recover after restarting. It needs to check all of its recent logs to make sure that it doesn't have any incomplete messages. It will also start replicating partitions from where it left off when it was restarted. Keep checking <tt>kafka topics --describe</tt> until all topic-partitions have all brokers in the isr. Once the topic-partitions are up to date on all brokers, you can start a replica election to balance the leaders across brokers. NOTE: auto leader rebalancing is enabled, but after any broker restarts you should probably run a <tt>kafka preferred-replica-election</tt>, as it is possible that high volume partitions might not resync fast enough to be considered by the auto rebalancer. See [https://phabricator.wikimedia.org/T207768 T207768] for more info. | ||
== Replica Elections == | == Replica Elections == | ||
To trigger a leadership rebalance manually, do the following. | |||
List topics to see the current leader assignments: | List topics to see the current leader assignments: | ||
Line 99: | Line 99: | ||
Wait few seconds, no more than a minute. If all goes well, <tt> kafka topics --describe </tt> again and you should see the leaders properly balanced. | Wait few seconds, no more than a minute. If all goes well, <tt> kafka topics --describe </tt> again and you should see the leaders properly balanced. | ||
== Verify the | == Verify the leader of a topic == | ||
Kafka nomenclature recap: a generic queue is called 'topic', and each one of them can be split in multiple partitions that producers and consumers will use to spread the load. For example, let's use the '''kafka topics --describe''' command described above to inspect a topic state:<syntaxhighlight lang="bash"> | Kafka nomenclature recap: a generic queue is called 'topic', and each one of them can be split in multiple partitions that producers and consumers will use to spread the load. For example, let's use the '''kafka topics --describe''' command described above to inspect a topic state:<syntaxhighlight lang="bash"> | ||
Topic:webrequest_text PartitionCount:24 ReplicationFactor:3 Configs:retention.bytes=375809638400 | Topic:webrequest_text PartitionCount:24 ReplicationFactor:3 Configs:retention.bytes=375809638400 | ||
Line 110: | Line 110: | ||
Topic: webrequest_text Partition: 22 Leader: 20 Replicas: 20,22,12 Isr: 20,22,12 | Topic: webrequest_text Partition: 22 Leader: 20 Replicas: 20,22,12 Isr: 20,22,12 | ||
Topic: webrequest_text Partition: 23 Leader: 22 Replicas: 22,12,13 Isr: 13,22,12 | Topic: webrequest_text Partition: 23 Leader: 22 Replicas: 22,12,13 Isr: 13,22,12 | ||
</syntaxhighlight>The webrequest_text topic contains data coming from Varnishkafka, related to the cache text HTTP requests. It is composed by 24 partitions (you can see them as the real queues that consumers/producers will use), each of them replicated three times. Zooming in:<syntaxhighlight lang="bash"> | </syntaxhighlight> | ||
The webrequest_text topic contains data coming from Varnishkafka, related to the cache text HTTP requests. It is composed by 24 partitions (you can see them as the real queues that consumers/producers will use), each of them replicated three times. Zooming in: | |||
</syntaxhighlight>Partition 0 has three replicas, stored on the kafka1018, kafka1012 and kafka1022 brokers. The broker that acts as leader for the partition is kafka1018, the other two have the duty to keep themselves in sync with it. The number of replicas in sync with the leader is described by the Isr field (short for In Sync Replicas), and of course the perfect state is when "Replicas" is the same as "Isr". Loosing one Isr replica may happen in case a broker goes down for some reason, and it is usually not a big deal if nothing else is happening (2/3 in sync replicas are enough to ensure resiliency). | <syntaxhighlight lang="bash"> | ||
Topic: webrequest_text Partition: 0 Leader: 18 Replicas: 18,22,12 Isr: 22,18,12 | |||
</syntaxhighlight> | |||
Partition 0 has three replicas, stored on the kafka1018, kafka1012 and kafka1022 brokers. The broker that acts as leader for the partition is kafka1018, the other two have the duty to keep themselves in sync with it. The number of replicas in sync with the leader is described by the Isr field (short for In Sync Replicas), and of course the perfect state is when "Replicas" is the same as "Isr". Loosing one Isr replica may happen in case a broker goes down for some reason, and it is usually not a big deal if nothing else is happening (2/3 in sync replicas are enough to ensure resiliency). | |||
== Alter topic partitions number == | |||
Be careful when doing this, as it is not reversible. Consumers may be relying on a certain number of partitions, so make sure you are aware of any downstream consequences for consumers before doing this. | |||
<code>kafka topics --alter --topic my_topic_name --partitions 5</code> | |||
== Alter topic retention settings == | |||
Our default is to keep data in a topic for 7 days. You can vary both the time and the size of a topic that should be kept. The time based retention setting is <tt>retention.ms</tt> and the size based retention setting is <tt>retention.bytes</tt>. To alter a topic config: | |||
<syntaxhighlight lang="bash"> | |||
kafka configs --alter --entity-type topics --entity-name my_topic_name --add-config retention.ms=2678400000 | |||
</syntaxhighlight>To undo this and revert to the default setting, you can delete the config setting:<syntaxhighlight lang="bash"> | |||
kafka configs --alter --entity-type topics --entity-name my_topic_name --delete-config retention.ms | |||
</syntaxhighlight> | |||
== Delete a topic == | |||
Kafka stores all the topic names in Zookeeper, so the easiest way to delete a topic is to use the following command from a Kafka host of the cluster in which the topic has been created (we'll use main-eqiad as example from now on):<syntaxhighlight lang="bash"> | |||
#~ kafka topics --delete --topic thisisatest | |||
kafka-topics --zookeeper conf1004.eqiad.wmnet,conf1005.eqiad.wmnet,conf1006.eqiad.wmnet/kafka/main-eqiad --delete --topic thisisatest | |||
</syntaxhighlight> | |||
As you can see there is a bash script on every Kafka host that automatically expands the kafka prefixed commands with all the boring parameters like zookeeper hostnames, paths, etc.. Before explaining what the command does under the hood, here's a list of places in which a topic name may be found in Zookeeper (keeping main-eqiad as example): | |||
* /kafka/main-eqiad/brokers/topics | |||
* /kafka/main-eqiad/admin/delete_topics | |||
* /kafka/main-eqiad/config/topics | |||
The first path stores znodes that hold the status of a given topic/partition, in particular what brokers are handling it: | |||
<syntaxhighlight lang="bash"> | |||
[zk: localhost:2181(CONNECTED) 14] get /kafka/main-eqiad/brokers/topics/eqiad.mediawiki.job.categoryMembershipChange/partitions/0/state | |||
{"controller_epoch":96,"leader":1001,"version":1,"leader_epoch":102,"isr":[1003,1002,1001]} | |||
[..cut..] | |||
</syntaxhighlight> | |||
The second one contains the list of topics flagged to be deleted, for example after running the kafka topics --delete command stated above. The last one generic information about the topic: | |||
<syntaxhighlight lang="bash"> | |||
[zk: localhost:2181(CONNECTED) 17] get /kafka/main-eqiad/config/topics/eqiad.mediawiki.job.categoryMembershipChange | |||
{"version":1,"config":{}} | |||
[..cut..] | |||
</syntaxhighlight> | |||
So in a normal scenario, the kafka topics --delete command should take care of these three paths by itself, setting up znodes in a way to instruct the Kafka brokers to do the proper clean up work. During an [[Incident documentation/20180711-kafka-eqiad|outage]] we discovered that sometimes this doesn't work, for example in the presence of Kafka core bugs like https://issues.apache.org/jira/browse/KAFKA-7156. In that particular case, the Kafka brokers were not able to delete topics due to a filename length problem on the ext4 data disk partitions, and since zookeeper was not updated correctly, even manual rm commands were not doing the right thing, since during boot the brokers always try to re-create a directory structure under the data directory that resembles what stored in zookeeper. So during the outage the brokers were re-creating topics after each restart, ending up in the same problem over and over again, so we had to grab the list of topics to delete, and do the following for each one of them:<syntaxhighlight lang="bash"> | |||
#/bin/bash | |||
for topic in $(cat topics_to_delete) | |||
do | |||
echo "Deleting ${topic}" | |||
/usr/share/zookeeper/bin/zkCli.sh rmr /kafka/main-eqiad/brokers/topics/${topic} | |||
/usr/share/zookeeper/bin/zkCli.sh rmr /kafka/main-eqiad/config/topics/${topic} | |||
/usr/share/zookeeper/bin/zkCli.sh rmr /kafka/main-eqiad/admin/delete_topics/${topic} | |||
done | |||
</syntaxhighlight> | |||
The above procedure is the nuclear option to use if nothing else works, and should be done with all brokers stopped to avoid them interfering with the deletion. After zookeeper is clean, the Kafka brokers can be restarted one at the time and they should be able to boot correctly without any issue. | |||
== Handling a downed broker == | == Handling a downed broker == | ||
Line 136: | Line 192: | ||
curl http://krypton.eqiad.wmnet:8000/v2/kafka/eqiad/consumer/mysql-m4-master/topic/eventlogging-valid-mixed | curl http://krypton.eqiad.wmnet:8000/v2/kafka/eqiad/consumer/mysql-m4-master/topic/eventlogging-valid-mixed | ||
{"error":false,"message":"consumer group topic offsets returned","offsets":[79014572,79014599,79003923,79014602,79014593,79014599,79014574,79014599,79003640,79014585,79014592,79014597]} | {"error":false,"message":"consumer group topic offsets returned","offsets":[79014572,79014599,79003923,79014602,79014593,79014599,79014574,79014599,79003640,79014585,79014592,79014597]} | ||
=== CLI === | |||
Run <code>kafka-consumer-groups</code> with no flags to get a full list of options. | |||
Get a list of groupsː | |||
KAFKA_HEAP_OPTS="-Xmx512M" kafka-consumer-groups --bootstrap-server localhost:9092 --list | |||
Get a list of topics for a group with their consumer offsets: | |||
KAFKA_HEAP_OPTS="-Xmx512M" kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group <group> | |||
Setting an offset to latest for a topic in group: | |||
KAFKA_HEAP_OPTS="-Xmx512M" kafka-consumer-groups --bootstrap-server localhost:9092 --reset-offsets --to-latest --group <group> --topic <topic> --dry-run | |||
When ready to execute the operation, change <code>--dry-run</code> to <code>--execute</code>. | |||
The offset for a topic partition can be specified as well: <code>--topic <topic>:<partition>(,<partition>)</code> | |||
== Purge broker logs == | == Purge broker logs == | ||
After being off for a while, when a broker rejoins its cluster it will replicate anything it has missed while it was off. Kafka's <tt>log.retention.hours</tt> setting is applied by looking at the mtime (or ctime?) of its log data files. On a recently started broker, these files are written later than usual, as replication catches back up. These files will have mtimes later than the produce time of the messages inside them, and as such their deletion may be delayed. This can cause disks to fill up more than usual. | After being off for a while, when a broker rejoins its cluster it will replicate anything it has missed while it was off. Kafka's <tt>log.retention.hours</tt> setting is applied by looking at the mtime (or ctime?) of its log data files. On a recently started broker, these files are written later than usual, as replication catches back up. These files will have mtimes later than the produce time of the messages inside them, and as such their deletion may be delayed. This can cause disks to fill up more than usual. | ||
If you receive an alarm like:<pre> | If you receive an alarm like: | ||
<pre> | |||
<icinga-wm_> PROBLEM - Disk space on kafka1012 is CRITICAL: DISK CRITICAL - free space: /var/spool/kafka/b 73705 MB (3% inode=99%): /var/spool/kafka/f 127290 MB (6% inode=99%) | <icinga-wm_> PROBLEM - Disk space on kafka1012 is CRITICAL: DISK CRITICAL - free space: /var/spool/kafka/b 73705 MB (3% inode=99%): /var/spool/kafka/f 127290 MB (6% inode=99%) | ||
</pre> | </pre> | ||
Line 155: | Line 231: | ||
# Please note the fact that you will have to set the maximum size of the | # Please note the fact that you will have to set the maximum size of the | ||
# topic's partitions, not the topic as a whole! | # topic's partitions, not the topic as a whole! | ||
# This will delete any log files on disk | # This will delete any log files on disk bigger than 536GB. | ||
kafka configs --alter --entity-type topics --entity-name webrequest_text --add-config retention.bytes=536870912000 | kafka configs --alter --entity-type topics --entity-name webrequest_text --add-config retention.bytes=536870912000 | ||
Line 170: | Line 246: | ||
sudo vim /etc/kafka/server.properties # lower down log.retention.hours=168 | sudo vim /etc/kafka/server.properties # lower down log.retention.hours=168 | ||
sudo service kafka restart | sudo service kafka restart | ||
</syntaxhighlight>Please remember to log your work in #wikimedia-operations. After checking /var/log/kafka/server.log for a confirmation of the delete actions (together with a df -h of course) please restore the previous config:<syntaxhighlight lang=bash> | </syntaxhighlight> | ||
Please remember to log your work in #wikimedia-operations. After checking /var/log/kafka/server.log for a confirmation of the delete actions (together with a df -h of course) please restore the previous config:<syntaxhighlight lang=bash> | |||
sudo vim /etc/kafka/server.properties # reset to default log.retention.hours=168 | sudo vim /etc/kafka/server.properties # reset to default log.retention.hours=168 | ||
sudo service kafka restart | sudo service kafka restart | ||
Line 177: | Line 254: | ||
After doing this for a broker, you should remember to run <tt>kafka preferred-replica-election</tt> to rebalance topic-partition leadership. | After doing this for a broker, you should remember to run <tt>kafka preferred-replica-election</tt> to rebalance topic-partition leadership. | ||
== Kafka Certificates == | == Kafka Certificates == | ||
Newer versions of Kafka support TLS encryption, authentication and authorization. | Newer versions of Kafka support TLS encryption, authentication and authorization. Certificates and keys are managed using [[cergen]]. Certificates are signed by our Puppet CA and distributed using Puppet. To create a new client key and certificate, add an entry to a cergen manifest file and run cergen with the --generate option as describe on the cergen documentation page. git add and commit the files to the puppet private repository, and then distribute the relevant files via puppet and configure your client. | ||
== Kafka ACLs == | == Kafka ACLs == | ||
Kafka ACLs are used to restrict access to Kafka cluster operations, Kafka topics, and Kafka consumer groups. By default, if an ACL exists for a specific resource, e.g. a topic, then all operations on that resource will be denied to any principal (AKA certificate) not explicitly listed for that resource. We want to allow anonymous unencrypted uses of most Kafka topics, but restrict certain others. For example, we'd like to restrict writes to any webrequest topic to only varnishkafka producers, but still allow for anyone to consume. To do this, we need the proper invocation of the <code>kafka acls</code> command. (Much of this was originally figured out and documented in [[phab:T167304|T167304]].) | Kafka ACLs are used to restrict access to Kafka cluster operations, Kafka topics, and Kafka consumer groups. By default, if an ACL exists for a specific resource, e.g. a topic, then all operations on that resource will be denied to any principal (AKA certificate) not explicitly listed for that resource. We want to allow anonymous unencrypted uses of most Kafka topics, but restrict certain others. For example, we'd like to restrict writes to any webrequest topic to only varnishkafka producers, but still allow for anyone to consume. To do this, we need the proper invocation of the <code>kafka acls</code> command. (Much of this was originally figured out and documented in [[phab:T167304|T167304]].) | ||
For certificate based authentication, we need to specify the full x509 DistinguishedName. To keep things simple, we generate subject-less certificates, so that we only have to use the CommonName in the ACLs. Before we add any set up any restricted ACLs at all, we need to allow defaults from the ANONYMOUS user:<syntaxhighlight lang="bash"> | For certificate based authentication, we need to specify the full x509 DistinguishedName. To keep things simple, we generate subject-less certificates, so that we only have to use the CommonName in the ACLs. Before we add any set up any restricted ACLs at all, we need to allow defaults from the ANONYMOUS user: | ||
<syntaxhighlight lang="bash"> | |||
# Allow ANONYOMOUS to produce to any topic | # Allow ANONYOMOUS to produce to any topic | ||
kafka acls --add --allow-principal User:ANONYMOUS --producer --topic '*' | kafka acls --add --allow-principal User:ANONYMOUS --producer --topic '*' | ||
# Allow ANONYMOUS to | # Allow ANONYMOUS to consume from any topic in any consumer group | ||
kafka acls --add --allow-principal User:ANONYMOUS --consumer --topic '*' --group '*' | kafka acls --add --allow-principal User:ANONYMOUS --consumer --topic '*' --group '*' | ||
Line 207: | Line 282: | ||
User:ANONYMOUS has Allow permission for operations: Create from hosts: * | User:ANONYMOUS has Allow permission for operations: Create from hosts: * | ||
</syntaxhighlight>Now we can restrict operations on resources to other principals. When a client communicates over the SSL port 9093, it will attempt to authenticate with its certificate's DN. You need to add ACLs for that DN if you want that client to be able to use Kafka at all. For this example, we'll allow User:CN=varnishkafka to produce to the webrequest topic, restrict anyone else from producing to webrequest, but still allow anyone to read from webrequest. Note that we've used wildcard topic and group names here. Kafka does not (yet?) support full glob wildcards. It is all or nuthing! E.g. --topic 'webrequest_*' will not work. <syntaxhighlight lang="bash"> | </syntaxhighlight> | ||
Now we can restrict operations on resources to other principals. When a client communicates over the SSL port 9093, it will attempt to authenticate with its certificate's DN. You need to add ACLs for that DN if you want that client to be able to use Kafka at all. For this example, we'll allow User:CN=varnishkafka to produce to the webrequest topic, restrict anyone else from producing to webrequest, but still allow anyone to read from webrequest. Note that we've used wildcard topic and group names here. Kafka does not (yet?) support full glob wildcards. It is all or nuthing! E.g. --topic 'webrequest_*' will not work. | |||
<syntaxhighlight lang="bash"> | |||
# Allow User:CN=varnishkafka to be a producer for the webrequest topic | # Allow User:CN=varnishkafka to be a producer for the webrequest topic | ||
kafka acls --add --allow-principal User:CN=varnishkafka --producer --topic webrequest | kafka acls --add --allow-principal User:CN=varnishkafka --producer --topic webrequest | ||
Line 232: | Line 309: | ||
User:ANONYMOUS has Allow permission for operations: Create from hosts: * | User:ANONYMOUS has Allow permission for operations: Create from hosts: * | ||
User:CN=varnishkafka has Allow permission for operations: Create from hosts: * | User:CN=varnishkafka has Allow permission for operations: Create from hosts: * | ||
</syntaxhighlight>Because no ACL exists for Topic:webrequest Read operation, any User, including User:ANONYMOUS, can still consume from the webrequest topic. | </syntaxhighlight> | ||
Because no ACL exists for Topic:webrequest Read operation, any User, including User:ANONYMOUS, can still consume from the webrequest topic. | |||
== New Broker Install == | == New Broker Install == | ||
Line 238: | Line 316: | ||
As of 2014-06, all of our brokers have 12 2TB disks. The first two disk have a 30GB RAID 1 /, and a 1GB RAID 1 swap. Partman will create these partitions when the node is installed. Partman is not fancy enough to do the rest, so you will have to do this manually. Copy/pasting the following should do everything necessary to set up the Broker data partitions. | As of 2014-06, all of our brokers have 12 2TB disks. The first two disk have a 30GB RAID 1 /, and a 1GB RAID 1 swap. Partman will create these partitions when the node is installed. Partman is not fancy enough to do the rest, so you will have to do this manually. Copy/pasting the following should do everything necessary to set up the Broker data partitions. | ||
< | <syntaxhighlight lang="bash"> | ||
data_directory='/var/spool/kafka' | data_directory='/var/spool/kafka' | ||
Line 315: | Line 393: | ||
done | done | ||
</ | </syntaxhighlight> | ||
Note: ext4 settings were taken from recommendations found here: https://kafka.apache.org/08/ops.html | Note: ext4 settings were taken from recommendations found here: https://kafka.apache.org/08/ops.html | ||
Line 385: | Line 463: | ||
** 5. Repeat steps 1 - 3 and compare results to previous version. There should be no (negative) change. | ** 5. Repeat steps 1 - 3 and compare results to previous version. There should be no (negative) change. | ||
* When doing production upgrade, document all steps in a deployment plan. Review and then execute the plan with a peer. Take notes on all steps along the way including execution of times for each step. This allows for easier documentation and correlations later if there are any problems. Be sure to keep an eye on the [http://grafana.wikimedia.org/#/dashboard/db/kafka Kafka dashboard] while deploying. | * When doing production upgrade, document all steps in a deployment plan. Review and then execute the plan with a peer. Take notes on all steps along the way including execution of times for each step. This allows for easier documentation and correlations later if there are any problems. Be sure to keep an eye on the [http://grafana.wikimedia.org/#/dashboard/db/kafka Kafka dashboard] while deploying. | ||
== MirrorMaker == | |||
Kafka MirrorMaker a glorified Kafka consumer -> producer process. It consumes messages from a source Kafka cluster, and produces them to a destination Kafka cluster. The messages themselves are thus 'reproduced' as new messages. Thus 'mirroring' is different than 'replication'. | |||
MirrorMaker is a peerless Kafka consumer group. Multiple processes belonging to the same Kafka consumer group can run, and will automatically balance load between themselves. It is safe to stop some or all MirrorMaker processes at any time, as long as they are restarted within the smallest Kafka topic retention time (7 days) with enough time to catch back up. | |||
=== main mirroring === | |||
We use Kafka MirrorMaker for datacenter failover between the two main Kafka clusters in eqiad and codfw. These clusters mirror all of their datacenter prefixed topics to each other. Original topics in main-eqiad are prefixed with 'eqiad.', and original topics in main-codfw are prefixed with 'codfw'. This allows whitelisting of the original topics in each source cluster. E.g. the MirrorMaker instance running on main-codfw nodes, named main-eqiad_to_main-codfw, consumes only topics that match eqiad.* from main-eqiad and produces them to main-codfw. The reciprocal MirrorMaker instance, main-codfw_to_main-eqiad, running on main-eqiad nodes, does the opposite. | |||
Since mirroring here is primarily used for datacenter failover, a short (less than a few days) downtime will have no practical impact (as long as there is no primary data center switch during the downtime). After a primary datacenter switch, the codfw.* prefixed topics will start being used in main-codfw, instead of the eqiad.* ones in main-eqiad. | |||
=== jumbo mirroring === | |||
We also use MirrorMaker to consume all of the topics from the main clusters to jumbo-eqiad for analytics and posterity purposes. The jumbo-eqiad nodes run a MirrorMaker instance called main-eqiad_to_jumbo-eqiad. This instance consumes almost all topics from main-eqiad, including the codfw.* prefixed ones that were mirrored by the main-codfw_to_main-eqiad MirrorMaker instance. These topics are then ingested into Hadoop for analytics usage. | |||
Note: As of 2018-07, the job queue and change-prop topics are not needed for anything in the jumbo-eqiad cluster. These topics are bursty and higher volume than the other topics in the main Kafka clusters. We've had problems with MirrorMaker and some of these topics in the past. If there are MirrorMaker errors where MirrorMaker fails producing messages in one of these topics, it is safe to [https://github.com/wikimedia/puppet/blob/35d51f1db21a1b626a046656dce7d965e3b47a87/hieradata/role/common/kafka/jumbo/broker.yaml#L41 blacklist the topic]. | |||
== Broker Migrations == | |||
=== Kafka-logging === | |||
The below migration process assumes a new host will be assuming the broker ID of the host being migrated away from. | |||
''note: the current/old broker will be referred to as "source_host" and the new host being migrated to will be referred to as "dest_host"'' | |||
# Write/merge homer templates/cr/firewall.conf patch to include new broker(s) addresses | |||
# Update broker list in eventgate-logging to include new broker(s) addresses | |||
## helmfile.d/services/eventgate-logging-external/values-eqiad.yaml | |||
## helmfile.d/services/eventgate-logging-external/values-codfw.yaml | |||
## helmfile.d/services/eventgate-logging-external/values-staging.yaml | |||
## helmfile.d/services/eventgate-logging-external/values.yaml | |||
# Downtime source_host in icinga | |||
# Disable notifications for dest_host (via hiera patch) | |||
# Downtime icinga “under replicated partitions” checks on all members of the kafka cluster undergoing maintenance (e.g. all kafka-logging eqiad broker hosts) | |||
# source_host: Disable puppet with message | |||
# dest_host: Disable puppet with message | |||
# source_host: stop & mask service kafka | |||
# Gerrit/puppetmaster: Merge “kafka-logging: replace source_host broker with dest_host" (example: https://gerrit.wikimedia.org/r/c/operations/puppet/+/677009) | |||
# Conf1*: run puppet agent | |||
# Run puppet agent manually on remaining kafka cluster hosts | |||
# run puppet agent on prometheus nodes | |||
# run puppet agent on grafana nodes | |||
# dest_host: enable puppet & run puppet agent | |||
# Ensure kafka, services come up cleanly (watch grafana dashboard for this kafka cluster, and watch consumer lag. it may be necessary to reload grafana in the browser to pick up new cluster hosts) | |||
# Give Kafka cluster time to sync and settle down | |||
# if replica imbalance does not correct itself, issue a reelection with `kafka preferred-replica-election` | |||
# Ensure dest_host green in icinga | |||
# Enable dest_host notifications via hiera (revert patch from step 4) | |||
# run puppet on source_host | |||
== Rebalance topic partitions to new brokers == | |||
One interesting use case for Kafka is how to rebalance topic partitions in a cluster when new brokers are added to expand it. We worked on this use case two times: T225005 and T255973. | |||
Kafka by default does not move any partition from one broker to another one unless explicitly told to do it, via <code>kafka reassign-partitions</code>. The tool is very powerful and generic, but the operator needs to come up with a plan about what partitions to move (from broker A to broker B), so tools like [https://github.com/DataDog/kafka-kit/tree/master/cmd/topicmappr topicmappr]'s "rebuild" may be useful to generate a plan. A plan is basically a list of json files (one for each topic) that can be given as input to <code>kafka reassign-partitions</code>. The tool supports multiple criteria to generate a plan, like pulling metrics from Prometheus, etc.., but for our use cases its default behavior leads to good results (basically move partitions evenly across a set of new brokers). | |||
More details about how topicmappr's rebuild works: https://github.com/DataDog/kafka-kit/wiki/Rebuild-command | |||
The overall idea is the following: | |||
* Find a list of topics that you want to target, for example the ones having a certain minimum level of traffic (like >= 10 mgs/s). Something like [https://thanos.wikimedia.org/graph?g0.range_input=1h&g0.max_source_resolution=0s&g0.expr=sort_desc(topk(1000%2C sum(irate(kafka_server_BrokerTopicMetrics_MessagesIn_total{kafka_cluster%3D"main-codfw"%2C topic%3D~".%2B"}[5m])) by (topic)) > 0)&g0.tab=1 this] may be a good starting point. | |||
* Use topicmappr to generate a plan, namely a list of json files (one for each topic/partition combination) that are readable by <code>kafka reassign-partitions</code> | |||
* Execute manually <code>kafka reassign-partitions</code> for all json files, one at the time. The command, once executed, returns immediately, but kafka may take a lot of time to move the target partition to another broker (depending on size/throttling/etc..). | |||
**'''Save the output of the above command since it prints the current status, that can be used to rollback if needed (see below).''' | |||
**Use the --throttle option for kafka-reassign-partitions to limit the bw used for data transfers (remember to add it every time you invoke the command otherwise it will be reset to default). | |||
**Use the --verify option to check the status of a rebalance periodically. The --verify options has also the power to remove the throttle after the rebalance is completed, to restore the broker to its previous state completely (otherwise other replication tasks might suffer as well). | |||
== Rebalance topic partitions == | |||
This section is related to various tips to use with <code>kafka reassign-partitions</code>. If you need to move partitions only due to new empty brokers, please see the above sections. | |||
The tool gets as input one json file for each topic, formatted like the following:<syntaxhighlight lang="json"> | |||
{ | |||
"partitions": [ | |||
{"topic": "eqiad.resource-purge", "partition": 0, "replicas": [2001,2005,2003]}, | |||
{"topic": "eqiad.resource-purge", "partition": 1, "replicas": [2002,2004,2001]}, | |||
{"topic": "eqiad.resource-purge", "partition": 2, "replicas": [2003,2004,2002]}, | |||
{"topic": "eqiad.resource-purge", "partition": 3, "replicas": [2004,2001,2002]}, | |||
{"topic": "eqiad.resource-purge", "partition": 4, "replicas": [2005,2001,2002]} | |||
], | |||
"version":1 | |||
} | |||
</syntaxhighlight>The idea is, for every partition, to instruct Kafka about what brokers should get it. The tool should not be used to increase the partitions number or their replicas, so it is essential to craft a json document that is consistent with the current topic state (number of partition, and number of replicas for each partition). Shuffling where the replicas will be placed helps in redistributing the traffic, since more workers will be able to act as partition leaders and the Kafka producers will be able to spread their traffic to more workers. | |||
Caveat: the first replica listed is the one that likely will do the partition leader for the broker. An interesting use case could be this:<syntaxhighlight lang="json"> | |||
{ | |||
"partitions": [ | |||
{"topic": "eqiad.resource-purge", "partition": 0, "replicas": [2001,2005,2003]}, | |||
{"topic": "eqiad.resource-purge", "partition": 1, "replicas": [2002,2004,2001]}, | |||
{"topic": "eqiad.resource-purge", "partition": 2, "replicas": [2004,2003,2002]}, | |||
{"topic": "eqiad.resource-purge", "partition": 3, "replicas": [2004,2001,2002]}, | |||
{"topic": "eqiad.resource-purge", "partition": 4, "replicas": [2005,2001,2002]} | |||
], | |||
"version":1 | |||
} | |||
</syntaxhighlight>In which I have changed only the partition 2 layout (the first replica is different from above). If we apply this configuration, Kafka will likely assign partition 2 and 3 to the same broker (2004) even if another broker would be the best choice (2003). Kafka rebalance partitions based on their number, it doesn't know much about how the operator wants to shape each broker's traffic, and it trusts the first replica to be a good indication about what the operator wants. | |||
Once you have executed <code>kafka reassign-partitions --execute</code> please run the same command with <code>--verify</code> until the partitions are reassigned (it clears out any throttling setting if applied). | |||
Note: the <code>--throttle</code> option is really nice if you have big topics and you don't want to risk saturating the broker's bandwidth. If applied please remember the suggestion written above about <code>--verify</code>. | |||
== Rollback/Stop topic partitions rebalance == | |||
Please do this only if you are in a really messed up state. A partition move will likely cause some metrics to show temporary issues, like under replicated partitions etc.., but eventually they auto-resolve.What should the operator do if something happens while the partition is being moved? The solution found by the Analytics/DE team at the time is the following one: | |||
* Stop the current rebalancing in progress (since it blocks other ones) | |||
<syntaxhighlight lang="bash"> | |||
# see https://phabricator.wikimedia.org/T255973#6762713 | |||
# This of course assuming main-eqiad/jumbo/etc.., | |||
# please use codfw's zookeeper cluster when needed. | |||
ssh conf1004.eqiad.wmnet | |||
elukey@conf1004:~$ sudo -u zookeeper /usr/share/zookeeper/bin/zkCli.sh | |||
rmr /kafka/test-eqiad/admin/reassign_partitions | |||
rmr /kafka/test-eqiad/controller | |||
</syntaxhighlight>The above will cause the Kafka controller (one broker elected at the time) to loose its status, and the cluster will elect another one. From this point onward no more partitions are being reassigned. | |||
* Rollback to the previous state (emitted by <code>kafka-reassign-partitions</code> when executed) | |||
== Renew TLS certificate == | |||
The brokers of Kafka clusters (that we manage in production) are all using TLS certificates to secure traffic between each other and the clients. At the time of writing (January 2022) there are two main types of certificates used: | |||
* '''Puppet''' - all the brokers use the same TLS certificate created with cergen, and trust only the Puppet CA. | |||
* '''PKI Kafka Intermediate''' - every broker has its own certificate (hostname based) issued by a special PKI intermediate CA. | |||
The latter is preferred, but the efforts to migrate all the clusters to it is still ongoing (see T291905 for more info). In order to check what certificate type is used by a broker, just ssh to it and run:<syntaxhighlight lang="bash"> | |||
echo y | openssl s_client -connect $(hostname -f):9093 | openssl x509 -issuer -nout | |||
</syntaxhighlight>If the CA mentioned is: | |||
* the Puppet one, then you'll need to follow [[Cergen#Update_a_certificate]] and deploy the new certificate to all nodes. | |||
* the Kafka PKI Intermediate one, then in theory a new certificate should be issued few days before the expiry and puppet should replace the Kafka keystore automatically (under /etc/kafka/ssl). | |||
In both cases, a roll restart of the brokers is needed to force them to pick up the new certificates. Please use the related Kafka cookbook :) | |||
=== Is there a way to reload the keystores without restarting? === | |||
In theory it should be sufficient to execute the <code>kafka-config</code> command on every broker to force a reload, but it practice this doesn't work with our version of kafka as described in https://phabricator.wikimedia.org/T299409 |
Revision as of 12:49, 4 April 2022
Safe Broker Restarts
Since partitions have at least 2 (usually 3) replicas, you should be able to restart a broker without losing any messages. Brokers, consumer, and producers will automatically rebalance themselves when a broker dies, but it is nice to allow them to do so gracefully. service kafka stop will perform a graceful shutdown. Before you run this, you should make sure that any topics for which the target broker is the leader also has In Sync Replicas:
root@kafka1014:~# kafka topics --describe ... Topic:webrequest_text PartitionCount:12 ReplicationFactor:3 Configs: Topic: webrequest_text Partition: 0 Leader: 18 Replicas: 18,22,12 Isr: 18,22,12 Topic: webrequest_text Partition: 1 Leader: 20 Replicas: 20,12,13 Isr: 20,13,12 Topic: webrequest_text Partition: 2 Leader: 22 Replicas: 22,13,14 Isr: 22,13,14 Topic: webrequest_text Partition: 3 Leader: 12 Replicas: 12,14,18 Isr: 18,12,14 Topic: webrequest_text Partition: 4 Leader: 13 Replicas: 13,18,20 Isr: 18,13,20 Topic: webrequest_text Partition: 5 Leader: 14 Replicas: 14,20,22 Isr: 22,20,14 Topic: webrequest_text Partition: 6 Leader: 18 Replicas: 18,12,13 Isr: 18,13,12 Topic: webrequest_text Partition: 7 Leader: 20 Replicas: 20,13,14 Isr: 20,13,14 Topic: webrequest_text Partition: 8 Leader: 22 Replicas: 22,14,18 Isr: 22,18,14 Topic: webrequest_text Partition: 9 Leader: 12 Replicas: 12,18,20 Isr: 20,18,12 Topic: webrequest_text Partition: 10 Leader: 13 Replicas: 13,20,22 Isr: 22,20,13 Topic: webrequest_text Partition: 11 Leader: 14 Replicas: 14,22,12 Isr: 22,12,14 ... # partitions with Leader: 14 have several brokers in the ISR for that partition. It is safe to stop kafka1014 root@kafka1014:~# service kafka stop
Notice how (eventually) after the broker stops, broker 14 is no longer the leader for any topics when you run kafka topics --describe.
Once you are ready to start the broker back up, you can do so with a simple service kafka start.
It will likely take a few minutes for the broker to recover after restarting. It needs to check all of its recent logs to make sure that it doesn't have any incomplete messages. It will also start replicating partitions from where it left off when it was restarted. Keep checking kafka topics --describe until all topic-partitions have all brokers in the isr. Once the topic-partitions are up to date on all brokers, you can start a replica election to balance the leaders across brokers. NOTE: auto leader rebalancing is enabled, but after any broker restarts you should probably run a kafka preferred-replica-election, as it is possible that high volume partitions might not resync fast enough to be considered by the auto rebalancer. See T207768 for more info.
Replica Elections
To trigger a leadership rebalance manually, do the following.
List topics to see the current leader assignments:
root@kafka1014:~$ kafka topics --describe Topic:webrequest_bits PartitionCount:12 ReplicationFactor:3 Configs: Topic: webrequest_bits Partition: 0 Leader: 22 Replicas: 22,12,18 Isr: 12,18,22 Topic: webrequest_bits Partition: 1 Leader: 12 Replicas: 12,18,21 Isr: 12,18,21 Topic: webrequest_bits Partition: 2 Leader: 18 Replicas: 18,21,22 Isr: 18,22,21 Topic: webrequest_bits Partition: 3 Leader: 21 Replicas: 21,22,12 Isr: 12,22,21 Topic: webrequest_bits Partition: 4 Leader: 22 Replicas: 22,18,21 Isr: 18,22,21 Topic: webrequest_bits Partition: 5 Leader: 12 Replicas: 12,21,22 Isr: 12,22,21 Topic: webrequest_bits Partition: 6 Leader: 18 Replicas: 18,22,12 Isr: 12,18,22 Topic: webrequest_bits Partition: 7 Leader: 21 Replicas: 21,12,18 Isr: 12,18,21 Topic: webrequest_bits Partition: 8 Leader: 22 Replicas: 22,21,12 Isr: 12,22,21 Topic: webrequest_bits Partition: 9 Leader: 12 Replicas: 12,22,18 Isr: 12,18,22 Topic: webrequest_bits Partition: 10 Leader: 18 Replicas: 18,12,21 Isr: 12,18,21 Topic: webrequest_bits Partition: 11 Leader: 21 Replicas: 21,18,22 Isr: 18,22,21 Topic:webrequest_mobile PartitionCount:12 ReplicationFactor:3 Configs: Topic: webrequest_mobile Partition: 0 Leader: 12 Replicas: 12,21,22 Isr: 12,22,21 Topic: webrequest_mobile Partition: 1 Leader: 18 Replicas: 18,22,12 Isr: 12,18,22 Topic: webrequest_mobile Partition: 2 Leader: 21 Replicas: 21,12,18 Isr: 12,18,21 Topic: webrequest_mobile Partition: 3 Leader: 22 Replicas: 22,18,21 Isr: 18,22,21 Topic: webrequest_mobile Partition: 4 Leader: 12 Replicas: 12,22,18 Isr: 12,18,22 Topic: webrequest_mobile Partition: 5 Leader: 18 Replicas: 18,12,21 Isr: 12,18,21 Topic: webrequest_mobile Partition: 6 Leader: 21 Replicas: 21,18,22 Isr: 18,22,21 Topic: webrequest_mobile Partition: 7 Leader: 22 Replicas: 22,21,12 Isr: 12,22,21 Topic: webrequest_mobile Partition: 8 Leader: 12 Replicas: 12,18,21 Isr: 12,18,21 Topic: webrequest_mobile Partition: 9 Leader: 18 Replicas: 18,21,22 Isr: 18,22,21 Topic: webrequest_mobile Partition: 10 Leader: 21 Replicas: 21,22,12 Isr: 12,22,21 Topic: webrequest_mobile Partition: 11 Leader: 22 Replicas: 22,12,18 Isr: 12,18,22 Topic:webrequest_text PartitionCount:12 ReplicationFactor:3 Configs: Topic: webrequest_text Partition: 0 Leader: 22 Replicas: 22,21,12 Isr: 12,22,21 Topic: webrequest_text Partition: 1 Leader: 12 Replicas: 12,22,18 Isr: 12,18,22 Topic: webrequest_text Partition: 2 Leader: 18 Replicas: 18,12,21 Isr: 12,18,21 Topic: webrequest_text Partition: 3 Leader: 21 Replicas: 21,18,22 Isr: 18,22,21 Topic: webrequest_text Partition: 4 Leader: 22 Replicas: 22,12,18 Isr: 12,18,22 Topic: webrequest_text Partition: 5 Leader: 12 Replicas: 12,18,21 Isr: 12,18,21 Topic: webrequest_text Partition: 6 Leader: 18 Replicas: 18,21,22 Isr: 18,22,21 Topic: webrequest_text Partition: 7 Leader: 21 Replicas: 21,22,12 Isr: 12,22,21 Topic: webrequest_text Partition: 8 Leader: 22 Replicas: 22,18,21 Isr: 18,22,21 Topic: webrequest_text Partition: 9 Leader: 12 Replicas: 12,21,22 Isr: 12,22,21 Topic: webrequest_text Partition: 10 Leader: 18 Replicas: 18,22,12 Isr: 12,18,22 Topic: webrequest_text Partition: 11 Leader: 21 Replicas: 21,12,18 Isr: 12,18,21 Topic:webrequest_upload PartitionCount:12 ReplicationFactor:3 Configs: Topic: webrequest_upload Partition: 0 Leader: 18 Replicas: 18,12,21 Isr: 12,18,21 Topic: webrequest_upload Partition: 1 Leader: 21 Replicas: 21,18,22 Isr: 22,18,21 Topic: webrequest_upload Partition: 2 Leader: 22 Replicas: 22,21,12 Isr: 12,22,21 Topic: webrequest_upload Partition: 3 Leader: 12 Replicas: 12,22,18 Isr: 12,18,22 Topic: webrequest_upload Partition: 4 Leader: 18 Replicas: 18,21,22 Isr: 18,22,21 Topic: webrequest_upload Partition: 5 Leader: 21 Replicas: 21,22,12 Isr: 22,12,21 Topic: webrequest_upload Partition: 6 Leader: 22 Replicas: 22,12,18 Isr: 12,18,22 Topic: webrequest_upload Partition: 7 Leader: 12 Replicas: 12,18,21 Isr: 12,18,21 Topic: webrequest_upload Partition: 8 Leader: 18 Replicas: 18,22,12 Isr: 12,18,22 Topic: webrequest_upload Partition: 9 Leader: 21 Replicas: 21,12,18 Isr: 12,18,21 Topic: webrequest_upload Partition: 10 Leader: 22 Replicas: 22,18,21 Isr: 18,22,21 Topic: webrequest_upload Partition: 11 Leader: 12 Replicas: 12,21,22 Isr: 12,22,21
In this case, you can see that leaders are balanced across all brokers. If they weren't (E.g.: broker “21” not appearing as leader) you can ask Kafka to do a leader election by running the following command on one of the brokers (i.e.: no sudo; the broker chosen does not matter).
kafka preferred-replica-election
Wait few seconds, no more than a minute. If all goes well, kafka topics --describe again and you should see the leaders properly balanced.
Verify the leader of a topic
Kafka nomenclature recap: a generic queue is called 'topic', and each one of them can be split in multiple partitions that producers and consumers will use to spread the load. For example, let's use the kafka topics --describe command described above to inspect a topic state:
Topic:webrequest_text PartitionCount:24 ReplicationFactor:3 Configs:retention.bytes=375809638400
Topic: webrequest_text Partition: 0 Leader: 18 Replicas: 18,22,12 Isr: 22,18,12
Topic: webrequest_text Partition: 1 Leader: 20 Replicas: 20,12,13 Isr: 20,13,12
Topic: webrequest_text Partition: 2 Leader: 22 Replicas: 22,13,14 Isr: 13,14,22
Topic: webrequest_text Partition: 3 Leader: 12 Replicas: 12,14,18 Isr: 14,18,12
Topic: webrequest_text Partition: 4 Leader: 13 Replicas: 13,18,20 Isr: 20,13,18
[...CUT...]
Topic: webrequest_text Partition: 22 Leader: 20 Replicas: 20,22,12 Isr: 20,22,12
Topic: webrequest_text Partition: 23 Leader: 22 Replicas: 22,12,13 Isr: 13,22,12
The webrequest_text topic contains data coming from Varnishkafka, related to the cache text HTTP requests. It is composed by 24 partitions (you can see them as the real queues that consumers/producers will use), each of them replicated three times. Zooming in:
Topic: webrequest_text Partition: 0 Leader: 18 Replicas: 18,22,12 Isr: 22,18,12
Partition 0 has three replicas, stored on the kafka1018, kafka1012 and kafka1022 brokers. The broker that acts as leader for the partition is kafka1018, the other two have the duty to keep themselves in sync with it. The number of replicas in sync with the leader is described by the Isr field (short for In Sync Replicas), and of course the perfect state is when "Replicas" is the same as "Isr". Loosing one Isr replica may happen in case a broker goes down for some reason, and it is usually not a big deal if nothing else is happening (2/3 in sync replicas are enough to ensure resiliency).
Alter topic partitions number
Be careful when doing this, as it is not reversible. Consumers may be relying on a certain number of partitions, so make sure you are aware of any downstream consequences for consumers before doing this.
kafka topics --alter --topic my_topic_name --partitions 5
Alter topic retention settings
Our default is to keep data in a topic for 7 days. You can vary both the time and the size of a topic that should be kept. The time based retention setting is retention.ms and the size based retention setting is retention.bytes. To alter a topic config:
kafka configs --alter --entity-type topics --entity-name my_topic_name --add-config retention.ms=2678400000
To undo this and revert to the default setting, you can delete the config setting:
kafka configs --alter --entity-type topics --entity-name my_topic_name --delete-config retention.ms
Delete a topic
Kafka stores all the topic names in Zookeeper, so the easiest way to delete a topic is to use the following command from a Kafka host of the cluster in which the topic has been created (we'll use main-eqiad as example from now on):
#~ kafka topics --delete --topic thisisatest
kafka-topics --zookeeper conf1004.eqiad.wmnet,conf1005.eqiad.wmnet,conf1006.eqiad.wmnet/kafka/main-eqiad --delete --topic thisisatest
As you can see there is a bash script on every Kafka host that automatically expands the kafka prefixed commands with all the boring parameters like zookeeper hostnames, paths, etc.. Before explaining what the command does under the hood, here's a list of places in which a topic name may be found in Zookeeper (keeping main-eqiad as example):
- /kafka/main-eqiad/brokers/topics
- /kafka/main-eqiad/admin/delete_topics
- /kafka/main-eqiad/config/topics
The first path stores znodes that hold the status of a given topic/partition, in particular what brokers are handling it:
[zk: localhost:2181(CONNECTED) 14] get /kafka/main-eqiad/brokers/topics/eqiad.mediawiki.job.categoryMembershipChange/partitions/0/state
{"controller_epoch":96,"leader":1001,"version":1,"leader_epoch":102,"isr":[1003,1002,1001]}
[..cut..]
The second one contains the list of topics flagged to be deleted, for example after running the kafka topics --delete command stated above. The last one generic information about the topic:
[zk: localhost:2181(CONNECTED) 17] get /kafka/main-eqiad/config/topics/eqiad.mediawiki.job.categoryMembershipChange
{"version":1,"config":{}}
[..cut..]
So in a normal scenario, the kafka topics --delete command should take care of these three paths by itself, setting up znodes in a way to instruct the Kafka brokers to do the proper clean up work. During an outage we discovered that sometimes this doesn't work, for example in the presence of Kafka core bugs like https://issues.apache.org/jira/browse/KAFKA-7156. In that particular case, the Kafka brokers were not able to delete topics due to a filename length problem on the ext4 data disk partitions, and since zookeeper was not updated correctly, even manual rm commands were not doing the right thing, since during boot the brokers always try to re-create a directory structure under the data directory that resembles what stored in zookeeper. So during the outage the brokers were re-creating topics after each restart, ending up in the same problem over and over again, so we had to grab the list of topics to delete, and do the following for each one of them:
#/bin/bash
for topic in $(cat topics_to_delete)
do
echo "Deleting ${topic}"
/usr/share/zookeeper/bin/zkCli.sh rmr /kafka/main-eqiad/brokers/topics/${topic}
/usr/share/zookeeper/bin/zkCli.sh rmr /kafka/main-eqiad/config/topics/${topic}
/usr/share/zookeeper/bin/zkCli.sh rmr /kafka/main-eqiad/admin/delete_topics/${topic}
done
The above procedure is the nuclear option to use if nothing else works, and should be done with all brokers stopped to avoid them interfering with the deletion. After zookeeper is clean, the Kafka brokers can be restarted one at the time and they should be able to boot correctly without any issue.
Handling a downed broker
It happened in the past that events like a single disk failure in the analytics-eqiad cluster caused tons of alarms fired due to a Kafka broker going down (we are not using RAID but JBOD for analytics-eqiad, this is why a disk failure cause troubles). Generally, one Kafka broker down in our clusters does not cause any harm to the overall Kafka availability, but it wise to double check to be sure when that happens. The above sections (especially "Verify the state of a topic") are a good start to make sure that the cluster is healthy, moreover it is wise to check the various dashboards listed in Kafka#Monitoring.
Usually the impact of a single broker down is nothing more than alarms for replicas not in sync, but as we discussed in the above sections this is not a critical situation if nothing else explodes at the same time. Just systemctl mask the kafka daemon and file a task to fix the hardware issue. During this time, daemons like Varnishkafka could log several errors stating that the broker is down, but librdkafka is strong enough to guarantee that no message is lost.
We are not aware of any bug or software issue that might happen to a Kafka broker.
Recovering a laggy broker replica
If a Kafka Broker goes offline for a long while, it will likely come back online and be far behind in logs. It will need to catch up on logs from remaining brokers before it can be put back into the ISR. During normal operation, replicas should be able to stay in sync with each other. But when one broker is far behind, you may need to tweak settings to encourage Kafka to spend more resources keeping replicas up to date.
num.replica.fetchers
This setting controls the number of threads dedicated to fetching logs from other replicas. Bump this number up temporarily and restart Kafka to try to get it to consume faster.
replica.fetch.max.bytes
This is the number of bytes that each fetch request will attempt to consume from each topic-partition. The actual number of bytes being requested at a time will be this multiplied by the number of topic-partitions. Be careful not to set this too high.
Checking consumer offsets
As of November 2015, LinkedIn's Burrow is installed and running on krypton.eqiad.wmnet. It is configured to email analytics admins if consumers start lagging. You can also query it directly via its HTTP interface. E.g., to see if a particular consumer group is lagging, and where its latest offset commits are:
curl http://krypton.eqiad.wmnet:8000/v2/kafka/eqiad/consumer/mysql-m4-master/topic/eventlogging-valid-mixed {"error":false,"message":"consumer group topic offsets returned","offsets":[79014572,79014599,79003923,79014602,79014593,79014599,79014574,79014599,79003640,79014585,79014592,79014597]}
CLI
Run kafka-consumer-groups
with no flags to get a full list of options.
Get a list of groupsː
KAFKA_HEAP_OPTS="-Xmx512M" kafka-consumer-groups --bootstrap-server localhost:9092 --list
Get a list of topics for a group with their consumer offsets:
KAFKA_HEAP_OPTS="-Xmx512M" kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group <group>
Setting an offset to latest for a topic in group:
KAFKA_HEAP_OPTS="-Xmx512M" kafka-consumer-groups --bootstrap-server localhost:9092 --reset-offsets --to-latest --group <group> --topic <topic> --dry-run
When ready to execute the operation, change --dry-run
to --execute
.
The offset for a topic partition can be specified as well: --topic <topic>:<partition>(,<partition>)
Purge broker logs
After being off for a while, when a broker rejoins its cluster it will replicate anything it has missed while it was off. Kafka's log.retention.hours setting is applied by looking at the mtime (or ctime?) of its log data files. On a recently started broker, these files are written later than usual, as replication catches back up. These files will have mtimes later than the produce time of the messages inside them, and as such their deletion may be delayed. This can cause disks to fill up more than usual.
If you receive an alarm like:
<icinga-wm_> PROBLEM - Disk space on kafka1012 is CRITICAL: DISK CRITICAL - free space: /var/spool/kafka/b 73705 MB (3% inode=99%): /var/spool/kafka/f 127290 MB (6% inode=99%)
You might want to purge some old logs from the broker's partitions. One strategy is to lower down the log retention rate. You can do this either per topic dynamically or statically for all topics by editing server.properties.
Temporarily Modify Per Topic Retention Settings
If you are just trying to buy some time until the global retention setting takes effect you can dynamically alter configuration for a topic. If you do this for a high volume topic, you might get Kafka to delete just enough data on disk to buy you enough time.
# Set rentention.ms for the high volume webrequest upload topic. This will delete any log files on disk older than 48 hours.
kafka configs --alter --entity-type topics --entity-name webrequest_upload --add-config retention.ms=172800000
# Set rentention.bytes for the high volume webrequest text topic partitions.
# Please note the fact that you will have to set the maximum size of the
# topic's partitions, not the topic as a whole!
# This will delete any log files on disk bigger than 536GB.
kafka configs --alter --entity-type topics --entity-name webrequest_text --add-config retention.bytes=536870912000
# Wait until brokers delete data. Make sure you have enough room to spare. If not, consider setting retention.ms for another topic.
# Once brokers have deleted data, it is safe to delete the per topic config override to reset it to the global default.
kafka configs --alter --entity-type topics --entity-name webrequest_upload --delete-config retention.ms
Temporarily Edit Global Retention Settings
If you need data for many topics deleted, it may be worth temporarily changing the global retention setting. This is more disruptive than doing so dynamically, since you must restart brokers for the config to be applied.
sudo puppet agent --disable
sudo vim /etc/kafka/server.properties # lower down log.retention.hours=168
sudo service kafka restart
Please remember to log your work in #wikimedia-operations. After checking /var/log/kafka/server.log for a confirmation of the delete actions (together with a df -h of course) please restore the previous config:
sudo vim /etc/kafka/server.properties # reset to default log.retention.hours=168
sudo service kafka restart
sudo puppet agent --enable
After doing this for a broker, you should remember to run kafka preferred-replica-election to rebalance topic-partition leadership.
Kafka Certificates
Newer versions of Kafka support TLS encryption, authentication and authorization. Certificates and keys are managed using cergen. Certificates are signed by our Puppet CA and distributed using Puppet. To create a new client key and certificate, add an entry to a cergen manifest file and run cergen with the --generate option as describe on the cergen documentation page. git add and commit the files to the puppet private repository, and then distribute the relevant files via puppet and configure your client.
Kafka ACLs
Kafka ACLs are used to restrict access to Kafka cluster operations, Kafka topics, and Kafka consumer groups. By default, if an ACL exists for a specific resource, e.g. a topic, then all operations on that resource will be denied to any principal (AKA certificate) not explicitly listed for that resource. We want to allow anonymous unencrypted uses of most Kafka topics, but restrict certain others. For example, we'd like to restrict writes to any webrequest topic to only varnishkafka producers, but still allow for anyone to consume. To do this, we need the proper invocation of the kafka acls
command. (Much of this was originally figured out and documented in T167304.)
For certificate based authentication, we need to specify the full x509 DistinguishedName. To keep things simple, we generate subject-less certificates, so that we only have to use the CommonName in the ACLs. Before we add any set up any restricted ACLs at all, we need to allow defaults from the ANONYMOUS user:
# Allow ANONYOMOUS to produce to any topic
kafka acls --add --allow-principal User:ANONYMOUS --producer --topic '*'
# Allow ANONYMOUS to consume from any topic in any consumer group
kafka acls --add --allow-principal User:ANONYMOUS --consumer --topic '*' --group '*'
# After this, kafka acls --list should show:
kafka acls --list
Current ACLs for resource `Group:*`:
User:ANONYMOUS has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Topic:*`:
User:ANONYMOUS has Allow permission for operations: Describe from hosts: *
User:ANONYMOUS has Allow permission for operations: Write from hosts: *
User:ANONYMOUS has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Cluster:kafka-cluster`:
User:ANONYMOUS has Allow permission for operations: Create from hosts: *
Now we can restrict operations on resources to other principals. When a client communicates over the SSL port 9093, it will attempt to authenticate with its certificate's DN. You need to add ACLs for that DN if you want that client to be able to use Kafka at all. For this example, we'll allow User:CN=varnishkafka to produce to the webrequest topic, restrict anyone else from producing to webrequest, but still allow anyone to read from webrequest. Note that we've used wildcard topic and group names here. Kafka does not (yet?) support full glob wildcards. It is all or nuthing! E.g. --topic 'webrequest_*' will not work.
# Allow User:CN=varnishkafka to be a producer for the webrequest topic
kafka acls --add --allow-principal User:CN=varnishkafka --producer --topic webrequest
# Deny unauthenticated users the ability to write to the webrequest topic
kafka acls --add --deny-principal User:ANONYMOUS --operation Write --topic webrequest
# Now we have the following ACLs defined:
kafka acls --list
Current ACLs for resource `Group:*`:
User:ANONYMOUS has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Topic:*`:
User:ANONYMOUS has Allow permission for operations: Describe from hosts: *
User:ANONYMOUS has Allow permission for operations: Write from hosts: *
User:ANONYMOUS has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Topic:webrequest`:
User:CN=varnishkafka has Allow permission for operations: Write from hosts: *
User:CN=varnishkafka has Allow permission for operations: Describe from hosts: *
User:ANONYMOUS has Deny permission for operations: Write from hosts: *
Current ACLs for resource `Cluster:kafka-cluster`:
User:ANONYMOUS has Allow permission for operations: Create from hosts: *
User:CN=varnishkafka has Allow permission for operations: Create from hosts: *
Because no ACL exists for Topic:webrequest Read operation, any User, including User:ANONYMOUS, can still consume from the webrequest topic.
New Broker Install
Partitioning
As of 2014-06, all of our brokers have 12 2TB disks. The first two disk have a 30GB RAID 1 /, and a 1GB RAID 1 swap. Partman will create these partitions when the node is installed. Partman is not fancy enough to do the rest, so you will have to do this manually. Copy/pasting the following should do everything necessary to set up the Broker data partitions.
data_directory='/var/spool/kafka'
# sda3 and sdb3 are already created, format them as ext4
for disk in /dev/sd{a,b}; do sudo fdisk $disk <<EOF
t
3
83
w
EOF
done
# sd{c..l}1 are full physical ext4 partitions
for disk in /dev/sd{c,d,e,f,g,h,i,j,k,l}; do sudo fdisk $disk <<EOF
n
p
1
w
EOF
done
# run partprobe to make the kernel pick up the partition changes.
apt-get install parted
partprobe
# mkfs.ext4 all data partitions
for disk_letter in a b c d e f g h i j k l; do
# use partition 3 on sda and sdb
if [ "${disk_letter}" = 'a' -o "${disk_letter}" = 'b' ]; then
partition_number=3
else
partition_number=1
fi
partition="/dev/sd${disk_letter}${partition_number}"
disk_data_directory="${data_directory}/${disk_letter}"
# Run mkfs.ext4 in background so we don't have to wait
# for this to complete synchronously.
mkfs.ext4 $partition &
done
### IMPORTANT!
# Wait for all the above ext4 filesystems to be formatted
# before running the following loop.
#
sudo mkdir -p $data_directory
for disk_letter in a b c d e f g h i j k l; do
# use partition 3 on sda and sdb
if [ "${disk_letter}" = 'a' -o "${disk_letter}" = 'b' ]; then
partition_number=3
else
partition_number=1
fi
partition="/dev/sd${disk_letter}${partition_number}"
mount_point="${data_directory}/${disk_letter}"
# don't reserve any blocks for OS on these partitions
tune2fs -m 0 $partition
### TODO: Edit this script to use UUID in fstab instead of $partition
# make the mount point
mkdir -pv $mount_point
grep -q $mount_point /etc/fstab || echo -e "# Kafka log partition ${disk_letter}\n${partition}\t${mount_point}\text4\tdefaults,noatime,data=writeback,nobh,delalloc\t0\t2" | sudo tee -a /etc/fstab
mount -v $mount_point
done
Note: ext4 settings were taken from recommendations found here: https://kafka.apache.org/08/ops.html
Swapping broken disk
Similar to the Hadoop/Administration#Swapping_broken_disk, new disks on analytics Kafka brokers need to have some megacli tweaks for them to be useable. This needed to be done in T136933 . The following are steps that were taken then.
These Kafka brokers have 12 disks in JBOD via MegaRAID. We'll need to clear foreign configuration from the old disk, and then mark this disk to be used as JBOD.
Procedure:
- Check the status of the disks after the swap using the following commands:
# Check general info about disks attached to the RAID and their status. sudo megacli -PDList -aAll # Get Firmware status only to have a quick peek about disks status: sudo megacli -PDList -aAll | grep Firm # Check Virtual Drive info sudo megacli -LDInfo -LAll -aAll
- if you see "Firmware state: Unconfigured(bad)" fix it with:
# From the previous commands you should be able to fill in the variables # with the values of the disk's properties indicated below: # X => Enclosure Device ID # Y => Slot Number # Z => Controller number megacli -PDMakeGood -PhysDrv[X:Y] -aZ # Example: # otto@kafka1012:~$ sudo megacli -PDList -aAll | egrep "Adapter|Enclosure Device ID:|Slot Number:|Firmware state" # Adapter #0 # Enclosure Device ID: 32 # Slot Number: 5 # Firmware state: Online, Spun Up # [..content cut..] megacli -PDMakeGood -PhysDrv[32:5] -a0
- Check for Foreign disks and fix them if needed:
server:~# megacli -CfgForeign -Scan -a0 There are 1 foreign configuration(s) on controller 0. server:~# megacli -CfgForeign -Clear -a0 Foreign configuration 0 is cleared on controller 0.
- Add the disk as JBOD:
megacli -PDMakeJBOD -PhysDrv[32:5] -a0
- You should be able to see the disk using fdisk, now is the time to add the partition and fs to it to complete the work.
sudo fdisk /dev/sdf # ... make a new primary partition filling up the whole disk sudo mkfs.ext4 /dev/sdf1 sudo tune2fs -m 0 /dev/sdf1
Very useful info contained in: http://hwraid.le-vert.net/wiki/LSIMegaRAIDSAS
Upgrade Checklist
The Analytics team experienced dataloss and a lot of headaches when performing a routine Kafka upgrade from 0.8.2.0 -> 0.8.2.1 in August 2015. The following is a deployment checklist we came up with as part of the postmortem after that outage. When upgrading, please follow this checklist before you proceed in production.
- Update (or remove) api_version in kafka_clusters hiera hash in common.yaml
- Check Release Notes for new versions. e.g. https://archive.apache.org/dist/kafka/0.8.2.1/RELEASE_NOTES.html
- Check Apache JIRA for bugs that may affect new version(s). e.g. https://issues.apache.org/jira/browse/KAFKA-2496?jql=project%20%3D%20KAFKA%20AND%20affectedVersion%20%3D%200.8.2.1
- Stress test to see if varnishkafka latency goes up. It may be difficult to do this, but it is worth a try.
- Set up a varnish+varnishkafka instance and Kafka cluster in labs (if there is not one in deployment-prep already).
- 1. Use ab (Apache Benchmark) to force varnishkafka to send requests as fast as you can.
- 2. Record rtt times in varnishkafka stats.json
- 3. Record data file sizes for Kafka partitions on Kafka brokers in /var/spool/kafka/
- 4. Upgrade Kafka cluster
- 5. Repeat steps 1 - 3 and compare results to previous version. There should be no (negative) change.
- When doing production upgrade, document all steps in a deployment plan. Review and then execute the plan with a peer. Take notes on all steps along the way including execution of times for each step. This allows for easier documentation and correlations later if there are any problems. Be sure to keep an eye on the Kafka dashboard while deploying.
MirrorMaker
Kafka MirrorMaker a glorified Kafka consumer -> producer process. It consumes messages from a source Kafka cluster, and produces them to a destination Kafka cluster. The messages themselves are thus 'reproduced' as new messages. Thus 'mirroring' is different than 'replication'.
MirrorMaker is a peerless Kafka consumer group. Multiple processes belonging to the same Kafka consumer group can run, and will automatically balance load between themselves. It is safe to stop some or all MirrorMaker processes at any time, as long as they are restarted within the smallest Kafka topic retention time (7 days) with enough time to catch back up.
main mirroring
We use Kafka MirrorMaker for datacenter failover between the two main Kafka clusters in eqiad and codfw. These clusters mirror all of their datacenter prefixed topics to each other. Original topics in main-eqiad are prefixed with 'eqiad.', and original topics in main-codfw are prefixed with 'codfw'. This allows whitelisting of the original topics in each source cluster. E.g. the MirrorMaker instance running on main-codfw nodes, named main-eqiad_to_main-codfw, consumes only topics that match eqiad.* from main-eqiad and produces them to main-codfw. The reciprocal MirrorMaker instance, main-codfw_to_main-eqiad, running on main-eqiad nodes, does the opposite.
Since mirroring here is primarily used for datacenter failover, a short (less than a few days) downtime will have no practical impact (as long as there is no primary data center switch during the downtime). After a primary datacenter switch, the codfw.* prefixed topics will start being used in main-codfw, instead of the eqiad.* ones in main-eqiad.
jumbo mirroring
We also use MirrorMaker to consume all of the topics from the main clusters to jumbo-eqiad for analytics and posterity purposes. The jumbo-eqiad nodes run a MirrorMaker instance called main-eqiad_to_jumbo-eqiad. This instance consumes almost all topics from main-eqiad, including the codfw.* prefixed ones that were mirrored by the main-codfw_to_main-eqiad MirrorMaker instance. These topics are then ingested into Hadoop for analytics usage.
Note: As of 2018-07, the job queue and change-prop topics are not needed for anything in the jumbo-eqiad cluster. These topics are bursty and higher volume than the other topics in the main Kafka clusters. We've had problems with MirrorMaker and some of these topics in the past. If there are MirrorMaker errors where MirrorMaker fails producing messages in one of these topics, it is safe to blacklist the topic.
Broker Migrations
Kafka-logging
The below migration process assumes a new host will be assuming the broker ID of the host being migrated away from.
note: the current/old broker will be referred to as "source_host" and the new host being migrated to will be referred to as "dest_host"
- Write/merge homer templates/cr/firewall.conf patch to include new broker(s) addresses
- Update broker list in eventgate-logging to include new broker(s) addresses
- helmfile.d/services/eventgate-logging-external/values-eqiad.yaml
- helmfile.d/services/eventgate-logging-external/values-codfw.yaml
- helmfile.d/services/eventgate-logging-external/values-staging.yaml
- helmfile.d/services/eventgate-logging-external/values.yaml
- Downtime source_host in icinga
- Disable notifications for dest_host (via hiera patch)
- Downtime icinga “under replicated partitions” checks on all members of the kafka cluster undergoing maintenance (e.g. all kafka-logging eqiad broker hosts)
- source_host: Disable puppet with message
- dest_host: Disable puppet with message
- source_host: stop & mask service kafka
- Gerrit/puppetmaster: Merge “kafka-logging: replace source_host broker with dest_host" (example: https://gerrit.wikimedia.org/r/c/operations/puppet/+/677009)
- Conf1*: run puppet agent
- Run puppet agent manually on remaining kafka cluster hosts
- run puppet agent on prometheus nodes
- run puppet agent on grafana nodes
- dest_host: enable puppet & run puppet agent
- Ensure kafka, services come up cleanly (watch grafana dashboard for this kafka cluster, and watch consumer lag. it may be necessary to reload grafana in the browser to pick up new cluster hosts)
- Give Kafka cluster time to sync and settle down
- if replica imbalance does not correct itself, issue a reelection with `kafka preferred-replica-election`
- Ensure dest_host green in icinga
- Enable dest_host notifications via hiera (revert patch from step 4)
- run puppet on source_host
Rebalance topic partitions to new brokers
One interesting use case for Kafka is how to rebalance topic partitions in a cluster when new brokers are added to expand it. We worked on this use case two times: T225005 and T255973.
Kafka by default does not move any partition from one broker to another one unless explicitly told to do it, via kafka reassign-partitions
. The tool is very powerful and generic, but the operator needs to come up with a plan about what partitions to move (from broker A to broker B), so tools like topicmappr's "rebuild" may be useful to generate a plan. A plan is basically a list of json files (one for each topic) that can be given as input to kafka reassign-partitions
. The tool supports multiple criteria to generate a plan, like pulling metrics from Prometheus, etc.., but for our use cases its default behavior leads to good results (basically move partitions evenly across a set of new brokers).
More details about how topicmappr's rebuild works: https://github.com/DataDog/kafka-kit/wiki/Rebuild-command
The overall idea is the following:
- Find a list of topics that you want to target, for example the ones having a certain minimum level of traffic (like >= 10 mgs/s). Something like this may be a good starting point.
- Use topicmappr to generate a plan, namely a list of json files (one for each topic/partition combination) that are readable by
kafka reassign-partitions
- Execute manually
kafka reassign-partitions
for all json files, one at the time. The command, once executed, returns immediately, but kafka may take a lot of time to move the target partition to another broker (depending on size/throttling/etc..).- Save the output of the above command since it prints the current status, that can be used to rollback if needed (see below).
- Use the --throttle option for kafka-reassign-partitions to limit the bw used for data transfers (remember to add it every time you invoke the command otherwise it will be reset to default).
- Use the --verify option to check the status of a rebalance periodically. The --verify options has also the power to remove the throttle after the rebalance is completed, to restore the broker to its previous state completely (otherwise other replication tasks might suffer as well).
Rebalance topic partitions
This section is related to various tips to use with kafka reassign-partitions
. If you need to move partitions only due to new empty brokers, please see the above sections.
The tool gets as input one json file for each topic, formatted like the following:
{
"partitions": [
{"topic": "eqiad.resource-purge", "partition": 0, "replicas": [2001,2005,2003]},
{"topic": "eqiad.resource-purge", "partition": 1, "replicas": [2002,2004,2001]},
{"topic": "eqiad.resource-purge", "partition": 2, "replicas": [2003,2004,2002]},
{"topic": "eqiad.resource-purge", "partition": 3, "replicas": [2004,2001,2002]},
{"topic": "eqiad.resource-purge", "partition": 4, "replicas": [2005,2001,2002]}
],
"version":1
}
The idea is, for every partition, to instruct Kafka about what brokers should get it. The tool should not be used to increase the partitions number or their replicas, so it is essential to craft a json document that is consistent with the current topic state (number of partition, and number of replicas for each partition). Shuffling where the replicas will be placed helps in redistributing the traffic, since more workers will be able to act as partition leaders and the Kafka producers will be able to spread their traffic to more workers. Caveat: the first replica listed is the one that likely will do the partition leader for the broker. An interesting use case could be this:
{
"partitions": [
{"topic": "eqiad.resource-purge", "partition": 0, "replicas": [2001,2005,2003]},
{"topic": "eqiad.resource-purge", "partition": 1, "replicas": [2002,2004,2001]},
{"topic": "eqiad.resource-purge", "partition": 2, "replicas": [2004,2003,2002]},
{"topic": "eqiad.resource-purge", "partition": 3, "replicas": [2004,2001,2002]},
{"topic": "eqiad.resource-purge", "partition": 4, "replicas": [2005,2001,2002]}
],
"version":1
}
In which I have changed only the partition 2 layout (the first replica is different from above). If we apply this configuration, Kafka will likely assign partition 2 and 3 to the same broker (2004) even if another broker would be the best choice (2003). Kafka rebalance partitions based on their number, it doesn't know much about how the operator wants to shape each broker's traffic, and it trusts the first replica to be a good indication about what the operator wants.
Once you have executed kafka reassign-partitions --execute
please run the same command with --verify
until the partitions are reassigned (it clears out any throttling setting if applied).
Note: the --throttle
option is really nice if you have big topics and you don't want to risk saturating the broker's bandwidth. If applied please remember the suggestion written above about --verify
.
Rollback/Stop topic partitions rebalance
Please do this only if you are in a really messed up state. A partition move will likely cause some metrics to show temporary issues, like under replicated partitions etc.., but eventually they auto-resolve.What should the operator do if something happens while the partition is being moved? The solution found by the Analytics/DE team at the time is the following one:
- Stop the current rebalancing in progress (since it blocks other ones)
# see https://phabricator.wikimedia.org/T255973#6762713
# This of course assuming main-eqiad/jumbo/etc..,
# please use codfw's zookeeper cluster when needed.
ssh conf1004.eqiad.wmnet
elukey@conf1004:~$ sudo -u zookeeper /usr/share/zookeeper/bin/zkCli.sh
rmr /kafka/test-eqiad/admin/reassign_partitions
rmr /kafka/test-eqiad/controller
The above will cause the Kafka controller (one broker elected at the time) to loose its status, and the cluster will elect another one. From this point onward no more partitions are being reassigned.
- Rollback to the previous state (emitted by
kafka-reassign-partitions
when executed)
Renew TLS certificate
The brokers of Kafka clusters (that we manage in production) are all using TLS certificates to secure traffic between each other and the clients. At the time of writing (January 2022) there are two main types of certificates used:
- Puppet - all the brokers use the same TLS certificate created with cergen, and trust only the Puppet CA.
- PKI Kafka Intermediate - every broker has its own certificate (hostname based) issued by a special PKI intermediate CA.
The latter is preferred, but the efforts to migrate all the clusters to it is still ongoing (see T291905 for more info). In order to check what certificate type is used by a broker, just ssh to it and run:
echo y | openssl s_client -connect $(hostname -f):9093 | openssl x509 -issuer -nout
If the CA mentioned is:
- the Puppet one, then you'll need to follow Cergen#Update_a_certificate and deploy the new certificate to all nodes.
- the Kafka PKI Intermediate one, then in theory a new certificate should be issued few days before the expiry and puppet should replace the Kafka keystore automatically (under /etc/kafka/ssl).
In both cases, a roll restart of the brokers is needed to force them to pick up the new certificates. Please use the related Kafka cookbook :)
Is there a way to reload the keystores without restarting?
In theory it should be sufficient to execute the kafka-config
command on every broker to force a reload, but it practice this doesn't work with our version of kafka as described in https://phabricator.wikimedia.org/T299409