[Kafka operation and maintenance] do you really understand data migration? (teaching video attached)

kafkaoperationmaintenancereallyunderstand

Daily Operation Troubleshoot problems Didi open source LogiKM "One-stop" work style Kafka Monitoring and control platform

【kafka Operation and maintenance 】 Data migration 、 Partition replica reallocation 、 Cross path migration 、 Copy expansion and reduction

If you don't want to read the article , You can watch the supporting videos directly ; ( The follow-up video will be in official account 、CSDN、B Station and other platforms upload it on the grocery store with the same name as shizhenzhen )

Current video content :

  1. Partition replica reassignment tutorial
  2. Reallocation considerations
  3. LogiKm Simplify the reassignment process , More efficient
  4. Cross replica migration

Video dot stamp :

【Kafka Operation and maintenance 】 Partition replica reassignment + matters needing attention + utilize LogIKM Simplified operation

Script parameters

Parameters

describe

Example

--zookeeper

Connect zk

--zookeeper localhost:2181, localhost:2182

--topics-to-move-json-file

Appoint json file , The content of the document is topic To configure

--topics-to-move-json-file config/move-json-file.json Json The file format is as follows :
Insert picture description here

--generate

Try to give a copy reassignment policy , The command is not actually executed

--broker-list

Specify specific BrokerList, Used to try to give an allocation policy , And --generate Use it with

--broker-list 0,1,2,3

--reassignment-json-file

Specify the to be reassigned json file , And --execute Use it with

json The file format is as follows, for example :
Insert picture description here

--execute

Start the reassignment task , And --reassignment-json-file Use it with

--verify

Verify that the task was executed successfully , When used --throttle If the current is limited , This command also removes the current limiter ; This command is important , If the current limit is not removed, it will affect the normal synchronization between replicas

--throttle

The migration process Broker The rate at which processes are transferred between , Company bytes/sec

-- throttle 500000

--replica-alter-log-dirs-throttle

broker Internal replica cross path migration data flow restriction function , Limit the bandwidth limit of data copies from one directory to another Company bytes/sec

--replica-alter-log-dirs-throttle 100000

--disable-rack-aware

Turn off rack awareness , Do not refer to rack information during allocation

--bootstrap-server

This parameter is required for replica cross path migration

1. Introduction to script

The script is kafka Provides scripting tools for reallocating partitions ;

1.1 Generate recommended configuration scripts

The key parameters --generate

Before the partition replica reallocation , It is best to obtain a reasonable allocation file in the following way ;

To write move-json-file.json file ; This document is to tell you what you want to do Topic Calculation of redistribution

{
"topics": [
{"topic": "test_create_topic1"}
],
"version": 1
}

Then execute the following script ,--broker-list "0,1,2,3" This parameter is what you want to assign Brokers;

sh bin/kafka-reassign-partitions.sh --zookeeper xxx:2181 --topics-to-move-json-file config/move-json-file.json --broker-list "0,1,2,3" --generate

After execution, it will print

Current partition replica assignment// Current replica allocation method
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[3],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[2],"log_dirs":["any"]}]}
Proposed partition reassignment configuration// Desired redistribution
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[0],"log_dirs":["any"]}]}

It's important to note that , At this time, the partition movement has not yet started , It just tells you the current assignments and suggestions . Save current assignment , In case you want to roll it back

1.2. perform Json file

The key parameters --execute

Save the desired reassignment method file obtained above in a json In the document

reassignment-json-file.json

{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[0],"log_dirs":["any"]}]}

And then execute

sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx:2181 --reassignment-json-file config/reassignment-json-file.json --execute

During the migration process, pay attention to the impact of the sudden increase of traffic on the cluster

Kafka Provide a broker Traffic limits for replication transfers between , Limits the bandwidth limit of a replica from one machine to another , When rebalancing clusters , Guide new broker, Add or remove broker When , It's very useful . Because it limits these intensive data operations, so as to ensure the impact on users 、

For example, we add a current limiting option to the above migration operation -- throttle 50000000

> sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx:2181 --reassignment-json-file config/reassignment-json-file.json --execute -- throttle 50000000

Add one after that —throttle 50000000 Parameters , When moving partitions , Will be limited to 50000000 B/s

After adding parameters, you can see

The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.

It should be noted that , If you migrate with Replica migration across paths ( The same Broker Multiple paths ) Then this current limiting measure will not take effect , You need to add |--replica-alter-log-dirs-throttle This current limiting parameter , It limits the same Broker Current limiting for direct migration of different paths ;

If you want to modify the limits during rebalancing , Increased throughput , In order to complete faster . You can run it again execute command , With the same reassignment-json-file

1.3. verification

The key parameters --verify

This option is used to check the status of partition reallocation , meanwhile —throttle The flow limit will also be removed ; Otherwise, the flow of periodic replication operations may be limited .

sh bin/kafka-reassign-partitions.sh --zookeeper xxxx:2181 --reassignment-json-file config/reassignment-json-file.json --verify

Insert picture description here

Be careful : When you enter BrokerId When there is no , The copy operation will fail , But it won't affect other ; for example

Insert picture description here

2. Copy expansion

kafka There is no special script to support the expansion and contraction of the copy , Unlike kafka-topic.sh The script is the same , You can expand the partition ; Want to expand and shrink the copy , It can only be a curve to save the country ; utilize kafka-reassign-partitions.sh To reassign copies

2.1 Replica expansion

Suppose our current situation is 3 Partition 1 copy , To provide availability , I want to increase the number of copies to 2;

2.1.1 Calculate how copies are allocated

We use it step 1.1 Of --generate Get the current allocation , Get the following json

{
"version": 1,
"partitions": [{
"topic": "test_create_topic1",
"partition": 2,
"replicas": [2],
"log_dirs": ["any"]
}, {
"topic": "test_create_topic1",
"partition": 1,
"replicas": [1],
"log_dirs": ["any"]
}, {
"topic": "test_create_topic1",
"partition": 0,
"replicas": [0],
"log_dirs": ["any"]
}]
}

We want to make copies of all partitions 2, Then we just need to modify "replicas": [] The value inside , Inside is Broker list , The first one is Leader; So we modify it according to the distribution rules we want json The file becomes as follows

{
"version": 1,
"partitions": [{
"topic": "test_create_topic1",
"partition": 2,
"replicas": [2,0],
"log_dirs": ["any","any"]
}, {
"topic": "test_create_topic1",
"partition": 1,
"replicas": [1,2],
"log_dirs": ["any","any"]
}, {
"topic": "test_create_topic1",
"partition": 0,
"replicas": [0,1],
"log_dirs": ["any","any"]
}]
}

Be careful log_dirs The quantity inside should be the same as replicas Quantity matches ; Or just put log_dirs Delete the option ; This log_dirs Is the absolute path when the replica is migrated across paths

2.1.2 perform --execute

Insert picture description here

If you want to modify the limits during rebalancing , Increased throughput , In order to complete faster . You can run it again execute command , With the same reassignment-json-file:

2.1.2 verification --verify

Insert picture description here

After that , The number of copies increases ;

2.2 Copy reduction

Replica shrink and expansion mean the same thing ; When the replica allocation is less than the previous number , The extra copies will be deleted ;

For example, I just added a copy , Want to restore to a copy

Execute the following json file

{
"version": 1,
"partitions": [{
"topic": "test_create_topic1",
"partition": 2,
"replicas": [2],
"log_dirs": ["any"]
}, {
"topic": "test_create_topic1",
"partition": 1,
"replicas": [1],
"log_dirs": ["any"]
}, {
"topic": "test_create_topic1",
"partition": 0,
"replicas": [0],
"log_dirs": ["any"]
}]
}

After execution, you can see that other copies are marked for deletion ; It will be cleaned up in a minute

Insert picture description here

<font color=red> In this way, although we realize the expansion and contraction of the replica , But the distribution of copies needs to be controlled by ourselves , To achieve load balancing and so on ; There must be no kafka It's more reasonable to automatically help us allocate ; So what good ways do we have to help us give a reasonable distribution Json Documents ?</font>PS:

We've analyzed it before 【kafka Source code 】 establish Topic How to partition and replica allocation rules Then we use the same rules to allocate such an allocation process Ok Did you? ?

--generate In essence, this method is also called ,AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)

For specific implementation operations, please see 【kafka reflection 】 Minimum cost expansion and contraction copy design scheme

Write a project to implement similar methods , If it's too much trouble , You can use it directly

LogIKM The new copy function directly helps you do this ;( The future will come true )

3. District expansion

kafka The partition expansion of is kafka-topis.sh Script implementation ; Shrinking is not supported

For partition expansion, please see 【kafka Source code 】TopicCommand And alter The source code parsing ( District expansion )

4. Partition migration

Partition migration is the same as above , Please have a look at 1.1,1.2,1.3 part ;

5. Replica migration across paths

Why online Kafka The occupation of each disk of the machine is uneven , Often appear “ One side down ” The circumstances of ? This is because Kafka Only ensure that the number of partitions is evenly distributed on each disk , But it can't know the actual space occupied by each partition , Therefore, it is very likely that a large number of partition messages will occupy a large amount of disk space . stay 1.1 Before the release , There's nothing users can do about it , because 1.1 Before Kafka Only partition data is supported in different broker Redistribution between , And can't do it in the same broker Redistribute between different disks under .1.1 The version officially supports the migration of replicas between different paths

Why is it on a Broker Use multiple paths to store partitions on ?

Just connect multiple folders to the configuration

############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=kafka-logs-5,kafka-logs-6,kafka-logs-7,kafka-logs-8

Pay attention to the same Broker Different paths on the will only store different partitions , Instead of storing copies in the same Broker; Otherwise, the copy will be meaningless ( disaster )

How to target cross path migration ?

Migrated json The file has a parameter that is log_dirs; If the default request is not transmitted It is "log_dirs": ["any"] ( The number of this array should be consistent with the number of copies )

But you want to migrate across paths , Just fill in the absolute path here , For example, the following

Migrated json File example

{
"version": 1,
"partitions": [{
"topic": "test_create_topic4",
"partition": 2,
"replicas": [0],
"log_dirs": ["/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-5"]
}, {
"topic": "test_create_topic4",
"partition": 1,
"replicas": [0],
"log_dirs": ["/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-6"]
}]
}

Then execute the script

sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx --reassignment-json-file config/reassignment-json-file.json --execute --bootstrap-server
xxxxx:9092 --replica-alter-log-dirs-throttle 10000

Be careful --bootstrap-server In the case of cross path migration , This parameter must be passed in

If current limiting is required Add parameters |--replica-alter-log-dirs-throttle ; Follow --throttle The difference is --replica-alter-log-dirs-throttle The limit is Broker Migration traffic of different paths in the ;

The source code parsing

See the article for source code analysis

【kafka Source code 】ReassignPartitionsCommand Source code analysis ( Copy expansion 、 Data migration 、 Partition redistribution 、 Replica migration across paths )

Daily Operation Troubleshoot problems Didi open source LogiKM "One-stop" work style Kafka Monitoring and control platform

copyright:author[Shizhenzhen's grocery store [official account of the same name]],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/175/20210726162029384I.html