顯示具有 kafka 標籤的文章。 顯示所有文章
顯示具有 kafka 標籤的文章。 顯示所有文章

Spring Cloud Stream Introduction - 1

Goal
描述如何套用 spring cloud stream + Kafka 以及概念.
適合只面對 Kafka, 不涵蓋進階議題

Example

Concepts
  • 一個 application 的架構: 外部系統(middleware) -> input -> application process -> output -> 外部系統
  • 在 Spring Cloud Stream
    • 透過 Binder 來處理外部系統的細節. ex. spring-cloud-stream-binder-kafka
    • 透過 Binding 來處理 input.

Example
  • Setup
    • Spring Boot Application
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

Kafka topic consumer
  • Consumer method (注意! 這裡的 method name: status 就是 Binding name, 會影響 application.properties)
@Bean
public Consumer<String> status() {
    return status -> System.out.println("Received " + status);
}
# 如果會從多個 topic 拿 message 就需要在 spring.cloud.function.definition 這個 attribute 指定, 並且用分號隔開. 這裡是預先指定一個
spring.cloud.function.definition=status

# 可以看到這的 pattern: spring.cloud.stream.bindings.{bindingName}-in-0.destination, 用來指定 topic name
# in 的部分是說 input topic
# 0 則是這個 binding 的第一個 input
spring.cloud.stream.bindings.status-in-0.destination=status
  • 再來就可以發訊息給 status 這個 topic

Function
透過 Function 可以處理 input -> process -> output
  • Function: 收到一個訊息後, 在尾巴貼上 random suffix, 然後回傳
@Bean
public Function<String, String> randomNumberSuffix() {
    return val -> val + " => append suffix " + Math.random();
}
  • 指定 input & output topic in application.properties
# 注意此時我們已經加上第二個 binding
spring.cloud.function.definition=status;randomNumberSuffix

# 指定 randomNumberSuffix 的 intput topic 是 randomNumberSuffix, output topic 則是 status, 也就是同一個 application 的另一個 binding
spring.cloud.stream.bindings.randomNumberSuffix-in-0.destination=randomNumberSuffix
spring.cloud.stream.bindings.randomNumberSuffix-out-0.destination=status
  • 發訊息給 randomNumberSuffix topic

Supplier
Supplier 是會被系統自動 polling, 預設一秒 poll 一次, 也可以指定 cron, 要 3.2 版之後才支援 custom binding polling configuration.
  • Supplier
@Bean
public Supplier<Date> mydate() {
    return () -> new Date();
}
  • 指定 output topic 給 mydate 這個 Supplier binding
# 此時已經增加第三個 binding definition
spring.cloud.function.definition=status;randomNumberSuffix;mydate

# 指定 output topic 為 status, 就是同個 app 的 topic
spring.cloud.stream.bindings.mydate-out-0.destination=status

# 改變預設的 poll config 為 2 秒 poll 一次
spring.cloud.stream.poller.fixed-delay=2000
  • Supplier 只要打開 app 就會自動被執行

Other Concepts
  • Consumer Group
    • 跟 Kafka Consumer Group 的概念一樣
    • 同一個 group 裡面只會有一個 consumer 收到 message
    • 不同 group 則都會收到訊息
    • 預設每個 consumer 都是不同的 group (anomymous group)
    • 透過 {binding}.group=xxx 來指定 groupName
  • Durability
    • 有指定 group, 則對 consumer 的 subscription 就會被保留, 即使這個 group 目前沒有 consumer, 等 consumer 回來, 就會接著收到訊息
    • anonymous group 的 subscription 就不會被保留, 因此 anonymous group 容易收到 duplicated message
  • Partition
    • 一個 topic 可以被切成多個 partition, 每個 partition 會由固定的一個 consumer 接收資料

 

Kafka - Pick a transaction.id

Intention: Why I need Kafka Transaction
  • 需求中收到 message 並處理之後, application 需要另外傳送訊息出去給多個 topic.
  • 不管遇到任何錯誤, 我都希望訊息就不要送出去.
  • 除此之外, 原本 consume 的訊息也不要收下來

How it works
  1. Producer initTransaction with transaction.id
  2. TransactionCoordinator close existing pending transactions with same transaction.id
  3. Producer send message, the message will be wrote to topic
  4. Producer commitTransaction, the TransactionCoordinator will start the 2 phase commit process
    1. Write PREPARE_COMMIT to the "transactionLog" topic
    2. Mark "commit" status in topic partitions
    3. write COMMITTED to transactionLog
  5. After these steps, transaction was pretty much to be finished
Note: transactionLog 是 internal topic, 用 transaction.id 作為 partition key, 因此可以保證狀態的順序

如果有兩個 Producer 共用 transaction.id, 當 Kafka 發現有相同的 transaction.id 存在的時候, 就會把先前的 transaction close,
因此如果 transaction.id 沒有規劃好就會遇到 transaction 莫名的被 abort 的 error.

Atomic Read-Process-Write
  • Consumer 收訊息下來後透過呼叫 commit offset 來標記已經處理完
  • 這個 offset 其實也是個 topic
  • 藉由前面介紹的 transaction 處理機制, 可以讓 "commitOffset" 也只是發訊息到一個 topic, 也可以被包在同一個 transaction 中

適當的 transaction.id
  • transaction.id 需要夠 unique, 如此才能避免 Producer 共用 transaction.id 而被 close
  • 由於每個 transaction 都需要一些額外的 request 才能完成, 所以如果 transaction.id 定太細導致一堆 transaction 會使效能大幅降低
  • tx-{consumeTopic}-{consumePartition} 是一個折衷的 transactionid, 因為一個 consumer 只會對應到一個 topic 以及 partition.
    在 read-process-write 的 pattern 下, 這個 transaction.id 會被該 topic & partition 的 consumer 使用, 不會有 multi-thread producer with same transaction.id 的情況.
    也不會過於分散 (同樣是在 read-process-write pattern 下, 一個 consumer thread 會一個一個訊息處理, 每個 consumed message 都值得一個 transaction (id).

KafkaTemplate 有個設定: producerPerConsumerPartition 就是拿來建立 tx-{consumeTopic}-{consumePartition} 這樣的 transaction.id.

它的做法就是在接收訊息的時候, 把 topic & partition 記錄下來, 好在發送訊息的時候 append 到 transaction.id prefix 後面










SpringBoot + Flyway + Kafka + PostgreSQL + Testcontainers

Source Code: https://github.com/axxdeveloper/study-practice/tree/testcontainer

在一個 sharing session 分享如何使用 testcontainer 輔助 SpringBoot application 開發測試 Kafka & PostgresSQL  相關的邏輯.

用 TestContainer 沒甚麼問題, 主要是多個 test class 開關 Kafka & Postgres 之後要重新讓 SpringBoot 連線比較麻煩, 這時候用了 DirtiesContexts

 


Start Kafka by docker-compose

  1. docker-compose.yml
services:  
  zookeeper-server:
    image: bitnami/zookeeper:latest
    ports:
    - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    
  kafka-server:
    image: bitnami/kafka:latest
    ports:
    - "9092:9092"
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092

  1. Start Kafka
docker-compose up -d
  1. Stop kafka
docker-compose down

Kafka Consumer specify offset

1. Producer send messages

```

private static void sendMessages(Properties kafkaProps, String topic) {

    for (int i = 0; i < MSG_COUNT; i++) {

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {

            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "mykey", "myval" + i);

            System.out.println("offset:" + producer.send(record).get(10, TimeUnit.SECONDS).offset());

        } catch (Throwable ex) {

            ex.printStackTrace();

        }

    }

}

```


2. Subscribe topic

```

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerProps("localhost:9092"))) {

            consumer.subscribe(Arrays.asList(TOPIC));

```


3. Consume messages

```

private static int consumeAllMessages(KafkaConsumer<String, String> consumer) {

        int total = 0;

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

            if (records.count() > 0) {

                total += records.count();

                System.out.println("receive:" + records.count() + " messages.");

            } else {

                System.out.println("receive no message");

                break;

            }

        }

        return total;

    }

```


4. Reset offset

```

private static void resetOffset(KafkaConsumer<String, String> consumer) {

        consumer.partitionsFor(TOPIC).forEach(p -> {

            System.out.println("Seek offset 0 in partition " + p.partition());

            consumer.seek(new TopicPartition(TOPIC, p.partition()), 0);

        });

    }

```


5. Consume all messages

```

private static int consumeAllMessages(KafkaConsumer<String, String> consumer) {

        int total = 0;

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

            if (records.count() > 0) {

                total += records.count();

                System.out.println("receive:" + records.count() + " messages.");

            } else {

                System.out.println("receive no message");

                break;

            }

        }

        return total;

    }

```


Full codes: ConsumerSubscribeAndAssignOffsetLaterMain.java 



Kafka Client Java Example

 Full Example: https://github.com/shooeugenesea/study-practice/blob/kafka/src/main/java/examples/Main.java


Create topic by Java
private static void createTopicIfNotExist(Properties kafkaProps, String topic) throws InterruptedException, ExecutionException, TimeoutException {
AdminClient adminClient = AdminClient.create(kafkaProps);
NewTopic newTopic = new NewTopic(topic, 1, /* replicationFactor */ (short)1);
if ( adminClient.listTopics().names().get(10, TimeUnit.SECONDS).contains(topic) ) {
System.out.println("Topic " + topic + " already exist");
return;
}
System.out.println("Topic " + topic + " doesn't exist, create it");
List<NewTopic> newTopics = new ArrayList<>();
newTopics.add(newTopic);
adminClient.createTopics(newTopics).all().get(10, TimeUnit.SECONDS);
adminClient.close();
}

Properties for Producer and Consumer
private static Properties kafkaProps(String url) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "TestGroup");

return properties;
}

Produce message by Java
try (KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProps)) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "mykey", "myval" + count);
System.out.println("offset:" + producer.send(record).get(10, TimeUnit.SECONDS).offset());
} catch (Throwable ex) {
ex.printStackTrace();
}

Consume message by Java
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProps)) {
consumer.subscribe(Arrays.asList(topic));
ConsumerRecords<String, String> records = null;
while (true) {
records = consumer.poll(Duration.ofSeconds(10));
if (records.count() > 0) {
IntStream.range(0, records.count()).forEach(i -> receiveCnt.countDown());
System.out.println("receive:" + records.count() + " messages");
}
}
} catch (Throwable ex) {
ex.printStackTrace();
}



Run Kafka by docker

 https://itnext.io/how-to-install-kafka-using-docker-a2b7c746cbdc


  1. Create network
$ sudo docker network create kafka-net --driver bridge

  1. Install zookeeper container
$ sudo docker run -d --name zookeeper-server -p 2181:2181 --network kafka-net -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:latest

  1. install kafka
$ sudo docker run -d --name kafka-server1 --network kafka-net -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 bitnami/kafka:latest

  1. check kafka ip
$ sudo docker inspect --format='{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' kafka-server1
172.18.0.2

  1. get metadata by kafkacat
$ sudo docker run -it --network=kafka-net edenhill/kafkacat:1.6.0 -b `sudo docker inspect --format='{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' kafka-server1` -L
Metadata for all topics (from broker -1: 172.18.0.2:9092/bootstrap):
1 brokers:
broker 1001 at localhost:9092 (controller)
0 topics:




別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...