Kafka Client assign and reset offset

 1. Producer send messages

```

private static void sendMessages(Properties kafkaProps, String topic) {

    for (int i = 0; i < MSG_COUNT; i++) {

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {

            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "mykey", "myval" + i);

            System.out.println("offset:" + producer.send(record).get(10, TimeUnit.SECONDS).offset());

        } catch (Throwable ex) {

            ex.printStackTrace();

        }

    }

}

```


2. Assign topic to consumer

```

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerProps("localhost:9092"))) {

            consumer.assign(consumer.partitionsFor(TOPIC).stream().map(p -> new TopicPartition(TOPIC, p.partition())).collect(Collectors.toList()));            

            resetOffset(consumer); // will NOT trigger java.lang.IllegalStateException: No current assignment for partition

```


3. Consume messages

```

private static int consumeAllMessages(KafkaConsumer<String, String> consumer) {

        int total = 0;

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

            if (records.count() > 0) {

                total += records.count();

                System.out.println("receive:" + records.count() + " messages.");

            } else {

                System.out.println("receive no message");

                break;

            }

        }

        return total;

    }

```


4. Reset offset

```

private static void resetOffset(KafkaConsumer<String, String> consumer) {

        consumer.partitionsFor(TOPIC).forEach(p -> {

            System.out.println("Seek offset 0 in partition " + p.partition());

            consumer.seek(new TopicPartition(TOPIC, p.partition()), 0);

        });

    }

```


5. Consume all messages

```

private static int consumeAllMessages(KafkaConsumer<String, String> consumer) {

        int total = 0;

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

            if (records.count() > 0) {

                total += records.count();

                System.out.println("receive:" + records.count() + " messages.");

            } else {

                System.out.println("receive no message");

                break;

            }

        }

        return total;

    }

```


Full code: 

沒有留言:

張貼留言

別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...