Kafka - 選擇 transaction.id
動機:為什麼我需要 Kafka Transaction
- 需求中收到 message 並處理之後, application 需要另外傳送訊息出去給多個 topic.
- 不管遇到任何錯誤, 我都希望訊息就不要送出去.
- 除此之外, 原本 consume 的訊息也不要收下來
運作方式
- Producer 用 transaction.id 呼叫 initTransaction
- TransactionCoordinator 關閉使用相同 transaction.id 的現有 pending transaction
- Producer 發送訊息,訊息會被寫入 topic
- Producer 呼叫 commitTransaction,TransactionCoordinator 會開始 2 phase commit 流程
- 把 PREPARE_COMMIT 寫入 "transactionLog" topic
- 在 topic partition 標記 "commit" 狀態
- 把 COMMITTED 寫入 transactionLog
- 完成以上步驟後,transaction 就差不多結束了
注意: transactionLog 是 internal topic, 用 transaction.id 作為 partition key, 因此可以保證狀態的順序
如果有兩個 Producer 共用 transaction.id, 當 Kafka 發現有相同的 transaction.id 存在的時候, 就會把先前的 transaction close,
因此如果 transaction.id 沒有規劃好就會遇到 transaction 莫名的被 abort 的 error.
Atomic Read-Process-Write
- Consumer 收訊息下來後透過呼叫 commit offset 來標記已經處理完
- 這個 offset 其實也是個 topic
- 藉由前面介紹的 transaction 處理機制, 可以讓 "commitOffset" 也只是發訊息到一個 topic, 也可以被包在同一個 transaction 中
適當的 transaction.id
- transaction.id 需要夠 unique, 如此才能避免 Producer 共用 transaction.id 而被 close
- 由於每個 transaction 都需要一些額外的 request 才能完成, 所以如果 transaction.id 定太細導致一堆 transaction 會使效能大幅降低
- tx-{consumeTopic}-{consumePartition} 是一個折衷的 transactionid, 因為一個 consumer 只會對應到一個 topic 以及 partition.在 read-process-write 的 pattern 下, 這個 transaction.id 會被該 topic & partition 的 consumer 使用, 不會有 multi-thread producer with same transaction.id 的情況.也不會過於分散 (同樣是在 read-process-write pattern 下, 一個 consumer thread 會一個一個訊息處理, 每個 consumed message 都值得一個 transaction (id).
KafkaTemplate 有個設定: producerPerConsumerPartition 就是拿來建立 tx-{consumeTopic}-{consumePartition} 這樣的 transaction.id.
它的做法就是在接收訊息的時候, 把 topic & partition 記錄下來, 好在發送訊息的時候 append 到 transaction.id prefix 後面