Spring Cloud Stream Introduction - 1

Goal
描述如何套用 spring cloud stream + Kafka 以及概念.
適合只面對 Kafka, 不涵蓋進階議題

Example

Concepts
  • 一個 application 的架構: 外部系統(middleware) -> input -> application process -> output -> 外部系統
  • 在 Spring Cloud Stream
    • 透過 Binder 來處理外部系統的細節. ex. spring-cloud-stream-binder-kafka
    • 透過 Binding 來處理 input.

Example
  • Setup
    • Spring Boot Application
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

Kafka topic consumer
  • Consumer method (注意! 這裡的 method name: status 就是 Binding name, 會影響 application.properties)
@Bean
public Consumer<String> status() {
    return status -> System.out.println("Received " + status);
}
# 如果會從多個 topic 拿 message 就需要在 spring.cloud.function.definition 這個 attribute 指定, 並且用分號隔開. 這裡是預先指定一個
spring.cloud.function.definition=status

# 可以看到這的 pattern: spring.cloud.stream.bindings.{bindingName}-in-0.destination, 用來指定 topic name
# in 的部分是說 input topic
# 0 則是這個 binding 的第一個 input
spring.cloud.stream.bindings.status-in-0.destination=status
  • 再來就可以發訊息給 status 這個 topic

Function
透過 Function 可以處理 input -> process -> output
  • Function: 收到一個訊息後, 在尾巴貼上 random suffix, 然後回傳
@Bean
public Function<String, String> randomNumberSuffix() {
    return val -> val + " => append suffix " + Math.random();
}
  • 指定 input & output topic in application.properties
# 注意此時我們已經加上第二個 binding
spring.cloud.function.definition=status;randomNumberSuffix

# 指定 randomNumberSuffix 的 intput topic 是 randomNumberSuffix, output topic 則是 status, 也就是同一個 application 的另一個 binding
spring.cloud.stream.bindings.randomNumberSuffix-in-0.destination=randomNumberSuffix
spring.cloud.stream.bindings.randomNumberSuffix-out-0.destination=status
  • 發訊息給 randomNumberSuffix topic

Supplier
Supplier 是會被系統自動 polling, 預設一秒 poll 一次, 也可以指定 cron, 要 3.2 版之後才支援 custom binding polling configuration.
  • Supplier
@Bean
public Supplier<Date> mydate() {
    return () -> new Date();
}
  • 指定 output topic 給 mydate 這個 Supplier binding
# 此時已經增加第三個 binding definition
spring.cloud.function.definition=status;randomNumberSuffix;mydate

# 指定 output topic 為 status, 就是同個 app 的 topic
spring.cloud.stream.bindings.mydate-out-0.destination=status

# 改變預設的 poll config 為 2 秒 poll 一次
spring.cloud.stream.poller.fixed-delay=2000
  • Supplier 只要打開 app 就會自動被執行

Other Concepts
  • Consumer Group
    • 跟 Kafka Consumer Group 的概念一樣
    • 同一個 group 裡面只會有一個 consumer 收到 message
    • 不同 group 則都會收到訊息
    • 預設每個 consumer 都是不同的 group (anomymous group)
    • 透過 {binding}.group=xxx 來指定 groupName
  • Durability
    • 有指定 group, 則對 consumer 的 subscription 就會被保留, 即使這個 group 目前沒有 consumer, 等 consumer 回來, 就會接著收到訊息
    • anonymous group 的 subscription 就不會被保留, 因此 anonymous group 容易收到 duplicated message
  • Partition
    • 一個 topic 可以被切成多個 partition, 每個 partition 會由固定的一個 consumer 接收資料

 

沒有留言:

張貼留言

別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...