Kafka - Pick a transaction.id

Intention: Why I need Kafka Transaction
  • 需求中收到 message 並處理之後, application 需要另外傳送訊息出去給多個 topic.
  • 不管遇到任何錯誤, 我都希望訊息就不要送出去.
  • 除此之外, 原本 consume 的訊息也不要收下來

How it works
  1. Producer initTransaction with transaction.id
  2. TransactionCoordinator close existing pending transactions with same transaction.id
  3. Producer send message, the message will be wrote to topic
  4. Producer commitTransaction, the TransactionCoordinator will start the 2 phase commit process
    1. Write PREPARE_COMMIT to the "transactionLog" topic
    2. Mark "commit" status in topic partitions
    3. write COMMITTED to transactionLog
  5. After these steps, transaction was pretty much to be finished
Note: transactionLog 是 internal topic, 用 transaction.id 作為 partition key, 因此可以保證狀態的順序

如果有兩個 Producer 共用 transaction.id, 當 Kafka 發現有相同的 transaction.id 存在的時候, 就會把先前的 transaction close,
因此如果 transaction.id 沒有規劃好就會遇到 transaction 莫名的被 abort 的 error.

Atomic Read-Process-Write
  • Consumer 收訊息下來後透過呼叫 commit offset 來標記已經處理完
  • 這個 offset 其實也是個 topic
  • 藉由前面介紹的 transaction 處理機制, 可以讓 "commitOffset" 也只是發訊息到一個 topic, 也可以被包在同一個 transaction 中

適當的 transaction.id
  • transaction.id 需要夠 unique, 如此才能避免 Producer 共用 transaction.id 而被 close
  • 由於每個 transaction 都需要一些額外的 request 才能完成, 所以如果 transaction.id 定太細導致一堆 transaction 會使效能大幅降低
  • tx-{consumeTopic}-{consumePartition} 是一個折衷的 transactionid, 因為一個 consumer 只會對應到一個 topic 以及 partition.
    在 read-process-write 的 pattern 下, 這個 transaction.id 會被該 topic & partition 的 consumer 使用, 不會有 multi-thread producer with same transaction.id 的情況.
    也不會過於分散 (同樣是在 read-process-write pattern 下, 一個 consumer thread 會一個一個訊息處理, 每個 consumed message 都值得一個 transaction (id).

KafkaTemplate 有個設定: producerPerConsumerPartition 就是拿來建立 tx-{consumeTopic}-{consumePartition} 這樣的 transaction.id.

它的做法就是在接收訊息的時候, 把 topic & partition 記錄下來, 好在發送訊息的時候 append 到 transaction.id prefix 後面

