Intention: Why I need Kafka Transaction
- 需求中收到 message 並處理之後, application 需要另外傳送訊息出去給多個 topic.
- 不管遇到任何錯誤, 我都希望訊息就不要送出去.
- 除此之外, 原本 consume 的訊息也不要收下來
How it works
- Producer initTransaction with transaction.id
- TransactionCoordinator close existing pending transactions with same transaction.id
- Producer send message, the message will be wrote to topic
- Producer commitTransaction, the TransactionCoordinator will start the 2 phase commit process
- Write PREPARE_COMMIT to the "transactionLog" topic
- Mark "commit" status in topic partitions
- write COMMITTED to transactionLog
- After these steps, transaction was pretty much to be finished
Note: transactionLog 是 internal topic, 用 transaction.id 作為 partition key, 因此可以保證狀態的順序
如果有兩個 Producer 共用 transaction.id, 當 Kafka 發現有相同的 transaction.id 存在的時候, 就會把先前的 transaction close,
因此如果 transaction.id 沒有規劃好就會遇到 transaction 莫名的被 abort 的 error.
Atomic Read-Process-Write
- Consumer 收訊息下來後透過呼叫 commit offset 來標記已經處理完
- 這個 offset 其實也是個 topic
- 藉由前面介紹的 transaction 處理機制, 可以讓 "commitOffset" 也只是發訊息到一個 topic, 也可以被包在同一個 transaction 中
適當的 transaction.id
- transaction.id 需要夠 unique, 如此才能避免 Producer 共用 transaction.id 而被 close
- 由於每個 transaction 都需要一些額外的 request 才能完成, 所以如果 transaction.id 定太細導致一堆 transaction 會使效能大幅降低
- tx-{consumeTopic}-{consumePartition} 是一個折衷的 transactionid, 因為一個 consumer 只會對應到一個 topic 以及 partition.在 read-process-write 的 pattern 下, 這個 transaction.id 會被該 topic & partition 的 consumer 使用, 不會有 multi-thread producer with same transaction.id 的情況.也不會過於分散 (同樣是在 read-process-write pattern 下, 一個 consumer thread 會一個一個訊息處理, 每個 consumed message 都值得一個 transaction (id).
KafkaTemplate 有個設定: producerPerConsumerPartition 就是拿來建立 tx-{consumeTopic}-{consumePartition} 這樣的 transaction.id.
它的做法就是在接收訊息的時候, 把 topic & partition 記錄下來, 好在發送訊息的時候 append 到 transaction.id prefix 後面