Kafka Client Java Example

 Full Example: https://github.com/shooeugenesea/study-practice/blob/kafka/src/main/java/examples/Main.java


Create topic by Java
private static void createTopicIfNotExist(Properties kafkaProps, String topic) throws InterruptedException, ExecutionException, TimeoutException {
AdminClient adminClient = AdminClient.create(kafkaProps);
NewTopic newTopic = new NewTopic(topic, 1, /* replicationFactor */ (short)1);
if ( adminClient.listTopics().names().get(10, TimeUnit.SECONDS).contains(topic) ) {
System.out.println("Topic " + topic + " already exist");
return;
}
System.out.println("Topic " + topic + " doesn't exist, create it");
List<NewTopic> newTopics = new ArrayList<>();
newTopics.add(newTopic);
adminClient.createTopics(newTopics).all().get(10, TimeUnit.SECONDS);
adminClient.close();
}

Properties for Producer and Consumer
private static Properties kafkaProps(String url) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "TestGroup");

return properties;
}

Produce message by Java
try (KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProps)) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "mykey", "myval" + count);
System.out.println("offset:" + producer.send(record).get(10, TimeUnit.SECONDS).offset());
} catch (Throwable ex) {
ex.printStackTrace();
}

Consume message by Java
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProps)) {
consumer.subscribe(Arrays.asList(topic));
ConsumerRecords<String, String> records = null;
while (true) {
records = consumer.poll(Duration.ofSeconds(10));
if (records.count() > 0) {
IntStream.range(0, records.count()).forEach(i -> receiveCnt.countDown());
System.out.println("receive:" + records.count() + " messages");
}
}
} catch (Throwable ex) {
ex.printStackTrace();
}



沒有留言:

張貼留言

別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...