Goal
描述如何套用 spring cloud stream + Kafka 以及概念.
適合只面對 Kafka, 不涵蓋進階議題
Example
Concepts
- 一個 application 的架構: 外部系統(middleware) -> input -> application process -> output -> 外部系統
- 在 Spring Cloud Stream
- 透過 Binder 來處理外部系統的細節. ex. spring-cloud-stream-binder-kafka
- 透過 Binding 來處理 input.
Example
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
Kafka topic consumer
- Consumer method (注意! 這裡的 method name: status 就是 Binding name, 會影響 application.properties)
@Bean
public Consumer<String> status() {
return status -> System.out.println("Received " + status);
}
# 如果會從多個 topic 拿 message 就需要在 spring.cloud.function.definition 這個 attribute 指定, 並且用分號隔開. 這裡是預先指定一個
spring.cloud.function.definition=status
# 可以看到這的 pattern: spring.cloud.stream.bindings.{bindingName}-in-0.destination, 用來指定 topic name
# in 的部分是說 input topic
# 0 則是這個 binding 的第一個 input
spring.cloud.stream.bindings.status-in-0.destination=status
- 再來就可以發訊息給 status 這個 topic
Function
透過 Function 可以處理 input -> process -> output
- Function: 收到一個訊息後, 在尾巴貼上 random suffix, 然後回傳
@Bean
public Function<String, String> randomNumberSuffix() {
return val -> val + " => append suffix " + Math.random();
}
- 指定 input & output topic in application.properties
# 注意此時我們已經加上第二個 binding
spring.cloud.function.definition=status;randomNumberSuffix
# 指定 randomNumberSuffix 的 intput topic 是 randomNumberSuffix, output topic 則是 status, 也就是同一個 application 的另一個 binding
spring.cloud.stream.bindings.randomNumberSuffix-in-0.destination=randomNumberSuffix
spring.cloud.stream.bindings.randomNumberSuffix-out-0.destination=status
- 發訊息給 randomNumberSuffix topic
Supplier
Supplier 是會被系統自動 polling, 預設一秒 poll 一次, 也可以指定 cron, 要 3.2 版之後才支援 custom binding polling configuration.
@Bean
public Supplier<Date> mydate() {
return () -> new Date();
}
- 指定 output topic 給 mydate 這個 Supplier binding
# 此時已經增加第三個 binding definition
spring.cloud.function.definition=status;randomNumberSuffix;mydate
# 指定 output topic 為 status, 就是同個 app 的 topic
spring.cloud.stream.bindings.mydate-out-0.destination=status
# 改變預設的 poll config 為 2 秒 poll 一次
spring.cloud.stream.poller.fixed-delay=2000
- Supplier 只要打開 app 就會自動被執行
Other Concepts
- Consumer Group
- 跟 Kafka Consumer Group 的概念一樣
- 同一個 group 裡面只會有一個 consumer 收到 message
- 不同 group 則都會收到訊息
- 預設每個 consumer 都是不同的 group (anomymous group)
- 透過 {binding}.group=xxx 來指定 groupName
- Durability
- 有指定 group, 則對 consumer 的 subscription 就會被保留, 即使這個 group 目前沒有 consumer, 等 consumer 回來, 就會接著收到訊息
- anonymous group 的 subscription 就不會被保留, 因此 anonymous group 容易收到 duplicated message
- Partition
- 一個 topic 可以被切成多個 partition, 每個 partition 會由固定的一個 consumer 接收資料