Kafka Client assign and reset offset

 1. Producer send messages

```

private static void sendMessages(Properties kafkaProps, String topic) {

    for (int i = 0; i < MSG_COUNT; i++) {

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {

            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "mykey", "myval" + i);

            System.out.println("offset:" + producer.send(record).get(10, TimeUnit.SECONDS).offset());

        } catch (Throwable ex) {

            ex.printStackTrace();

        }

    }

}

```


2. Assign topic to consumer

```

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerProps("localhost:9092"))) {

            consumer.assign(consumer.partitionsFor(TOPIC).stream().map(p -> new TopicPartition(TOPIC, p.partition())).collect(Collectors.toList()));            

            resetOffset(consumer); // will NOT trigger java.lang.IllegalStateException: No current assignment for partition

```


3. Consume messages

```

private static int consumeAllMessages(KafkaConsumer<String, String> consumer) {

        int total = 0;

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

            if (records.count() > 0) {

                total += records.count();

                System.out.println("receive:" + records.count() + " messages.");

            } else {

                System.out.println("receive no message");

                break;

            }

        }

        return total;

    }

```


4. Reset offset

```

private static void resetOffset(KafkaConsumer<String, String> consumer) {

        consumer.partitionsFor(TOPIC).forEach(p -> {

            System.out.println("Seek offset 0 in partition " + p.partition());

            consumer.seek(new TopicPartition(TOPIC, p.partition()), 0);

        });

    }

```


5. Consume all messages

```

private static int consumeAllMessages(KafkaConsumer<String, String> consumer) {

        int total = 0;

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

            if (records.count() > 0) {

                total += records.count();

                System.out.println("receive:" + records.count() + " messages.");

            } else {

                System.out.println("receive no message");

                break;

            }

        }

        return total;

    }

```


Full code: 

沒有留言:

張貼留言

caffeinate – make your Mac awake

When running long tests on macOS, the machine may go to sleep if you don’t touch it. There’s a built-in command that keeps it awake. ...