Deep understanding of Kafka principle: implementing Kafka producer in Java

zhengzaifeidelushang 2022-01-26 21:00:54 阅读数:599

deep understanding kafka principle implementing

One 、 introduce pom.xml rely on

 <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.2</version>
</dependency>

Two 、java Realization kafka producer

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MyProducer {

private final static String TOPIC_NAME = "optics-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {

Properties props = new Properties();
// Set up kafka Cluster address 
props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username=\"debezium\" password=\"NGFlM2I1NTJlNmFk\";");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
//ack Pattern ,all Is the slowest but safest 
props.put("acks", "-1");
// Failed retries 
props.put("retries", 0);
// Total byte size of unsent messages per partition ( Company : byte ), If it exceeds the set value, it will submit data to the server 
props.put("batch.size", 10);
//props.put("max.request.size",10);
// How long does the message stay in the buffer , If the value exceeds the set value, it will be submitted to the server 
props.put("linger.ms", 10000);
// Whole Producer Total memory used , If the buffer is full, data will be submitted to the server 
//buffer.memory Be greater than batch.size, Otherwise, an error of insufficient memory will be reported 
props.put("buffer.memory", 10240);
// Serializer 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
//key: The function is to decide which partition to send ,value: Specific message content to be sent 
for (int i = 0; i < 10; i++) {

RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, Integer.toString(i), "dd:" + i)).get();
System.out.println(" Send message results in synchronous mode :" + "topic name :" + metadata.topic() + " | partition Partition :" + metadata.partition() + " | offset Offset :" + metadata.offset());
}
}
}

The output is as follows :

 Send message results in synchronous mode :topic name :optics-topic | partition Partition :0 | offset Offset :6
Send message results in synchronous mode :topic name :optics-topic | partition Partition :2 | offset Offset :8
Send message results in synchronous mode :topic name :optics-topic | partition Partition :2 | offset Offset :9
Send message results in synchronous mode :topic name :optics-topic | partition Partition :1 | offset Offset :6
Send message results in synchronous mode :topic name :optics-topic | partition Partition :0 | offset Offset :7
Send message results in synchronous mode :topic name :optics-topic | partition Partition :1 | offset Offset :7
Send message results in synchronous mode :topic name :optics-topic | partition Partition :0 | offset Offset :8
Send message results in synchronous mode :topic name :optics-topic | partition Partition :0 | offset Offset :9
Send message results in synchronous mode :topic name :optics-topic | partition Partition :2 | offset Offset :10

3、 ... and 、 Send message to specified partition

 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
//key: The function is to decide which partition to send ,value: Specific message content to be sent 
for (int i = 0; i < 10; i++) {

RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, 1,Integer.toString(i), "dd:" + i)).get();
System.out.println(" Send message results in synchronous mode :" + "topic name :" + metadata.topic() + " | partition Partition :" + metadata.partition() + " | offset Offset :" + metadata.offset());
}

The output is as follows :

 Send message results in synchronous mode :topic name :optics-topic | partition Partition :1 | offset Offset :9
Send message results in synchronous mode :topic name :optics-topic | partition Partition :1 | offset Offset :10
Send message results in synchronous mode :topic name :optics-topic | partition Partition :1 | offset Offset :11
Send message results in synchronous mode :topic name :optics-topic | partition Partition :1 | offset Offset :12
Send message results in synchronous mode :topic name :optics-topic | partition Partition :1 | offset Offset :13
Send message results in synchronous mode :topic name :optics-topic | partition Partition :1 | offset Offset :14
Send message results in synchronous mode :topic name :optics-topic | partition Partition :1 | offset Offset :15
Send message results in synchronous mode :topic name :optics-topic | partition Partition :1 | offset Offset :16
Send message results in synchronous mode :topic name :optics-topic | partition Partition :1 | offset Offset :17
copyright:author[zhengzaifeidelushang],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/01/202201262100521761.html