1. Producer send messages
```
private static void sendMessages(Properties kafkaProps, String topic) {
for (int i = 0; i < MSG_COUNT; i++) {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "mykey", "myval" + i);
System.out.println("offset:" + producer.send(record).get(10, TimeUnit.SECONDS).offset());
} catch (Throwable ex) {
ex.printStackTrace();
}
}
}
```
2. Assign topic to consumer
```
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerProps("localhost:9092"))) {
consumer.assign(consumer.partitionsFor(TOPIC).stream().map(p -> new TopicPartition(TOPIC, p.partition())).collect(Collectors.toList()));
resetOffset(consumer); // will NOT trigger java.lang.IllegalStateException: No current assignment for partition
```
3. Consume messages
```
private static int consumeAllMessages(KafkaConsumer<String, String> consumer) {
int total = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
if (records.count() > 0) {
total += records.count();
System.out.println("receive:" + records.count() + " messages.");
} else {
System.out.println("receive no message");
break;
}
}
return total;
}
```
4. Reset offset
```
private static void resetOffset(KafkaConsumer<String, String> consumer) {
consumer.partitionsFor(TOPIC).forEach(p -> {
System.out.println("Seek offset 0 in partition " + p.partition());
consumer.seek(new TopicPartition(TOPIC, p.partition()), 0);
});
}
```
5. Consume all messages
```
private static int consumeAllMessages(KafkaConsumer<String, String> consumer) {
int total = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
if (records.count() > 0) {
total += records.count();
System.out.println("receive:" + records.count() + " messages.");
} else {
System.out.println("receive no message");
break;
}
}
return total;
}
```
沒有留言:
張貼留言