Full Example: https://github.com/shooeugenesea/study-practice/blob/kafka/src/main/java/examples/Main.java
Create topic by Java
private static void createTopicIfNotExist(Properties kafkaProps, String topic) throws InterruptedException, ExecutionException, TimeoutException {
AdminClient adminClient = AdminClient.create(kafkaProps);
NewTopic newTopic = new NewTopic(topic, 1, /* replicationFactor */ (short)1);
if ( adminClient.listTopics().names().get(10, TimeUnit.SECONDS).contains(topic) ) {
System.out.println("Topic " + topic + " already exist");
return;
}
System.out.println("Topic " + topic + " doesn't exist, create it");
List<NewTopic> newTopics = new ArrayList<>();
newTopics.add(newTopic);
adminClient.createTopics(newTopics).all().get(10, TimeUnit.SECONDS);
adminClient.close();
}
Properties for Producer and Consumer
private static Properties kafkaProps(String url) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "TestGroup");
return properties;
}
Produce message by Java
try (KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProps)) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "mykey", "myval" + count);
System.out.println("offset:" + producer.send(record).get(10, TimeUnit.SECONDS).offset());
} catch (Throwable ex) {
ex.printStackTrace();
}
Consume message by Java
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProps)) {
consumer.subscribe(Arrays.asList(topic));
ConsumerRecords<String, String> records = null;
while (true) {
records = consumer.poll(Duration.ofSeconds(10));
if (records.count() > 0) {
IntStream.range(0, records.count()).forEach(i -> receiveCnt.countDown());
System.out.println("receive:" + records.count() + " messages");
}
}
} catch (Throwable ex) {
ex.printStackTrace();
}
沒有留言:
張貼留言