Kafka 事务机制 跨分区 跨会话 通俗讲解 可运行代码示例一、先白话定义1. 跨分区写入一次业务需要往多个分区 / 多个主题发送多条消息。事务保证多条消息要么全部提交消费者可见要么全部回滚一条都看不见不会中间成功一半。2. 跨会话生产者中途宕机、重启、换实例新的生产者会话还能接手之前未完成的事务继续提交 / 回滚保证原子性不被破坏。3. 解决的问题不用事务可能订单消息发成功库存消息发失败 → 数据不一致。用事务同批次多条消息同生共死。二、核心前置配置Kafka 事务必须配置开启幂等生产者指定transactional.id配置事务超时时间三、代码示例跨分区 / 跨主题 事务原子写入场景下单成功要同时发两条消息订单主题order_topic库存主题stock_topic要求两条消息要么都成功要么都撤回。import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaTransactionDemo { public static void main(String[] args) { Properties props new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 1. 开启幂等生产者事务依赖 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 2. 设置事务唯一ID props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transaction-id-001); // 3. 事务超时时间 props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000); ProducerString, String producer new KafkaProducer(props); try { // 初始化事务 producer.initTransactions(); // 开启事务 producer.beginTransaction(); // 跨主题/跨分区 发送多条消息 // 消息1订单主题 ProducerRecordString, String orderMsg new ProducerRecord(order_topic, order_001, 创建订单成功); producer.send(orderMsg); // 消息2库存主题 ProducerRecordString, String stockMsg new ProducerRecord(stock_topic, stock_001, 扣减商品库存); producer.send(stockMsg); // 模拟中间业务异常测试回滚 // int i 1 / 0; // 正常提交事务两条消息同时对消费者可见 producer.commitTransaction(); } catch (Exception e) { // 异常回滚事务两条消息全部不可见 producer.abortTransaction(); e.printStackTrace(); } finally { producer.close(); } } }四、代码逻辑解释initTransactions()初始化事务环境beginTransaction()开启事务连续往 ** 不同主题本质就是跨分区** 发两条消息无异常commitTransaction()→ 两条消息同时生效消费者都能消费有异常abortTransaction()→ 两条消息全部作废消费者一条都看不到完美解决一部分成功、一部分失败的数据不一致问题。五、跨会话 是什么代码层面理解上面代码transactional.idtransaction-id-001是全局唯一的如果生产者运行中宕机、重启新生产者配置同一个 transactional.idKafka 会识别这是同一个事务会话可以查询上一次事务状态继续提交或回滚 这就叫跨会话事务恢复生产者换了进程、重启了事务还能接着收尾不会卡住、不会脏数据。六、面试背诵版结合代码总结Kafka 事务机制支持跨分区、跨主题原子写入也支持跨生产者会话事务恢复。代码层面通过配置transactional.id、开启幂等生产者再通过beginTransaction开启事务批量向多个主题 / 分区发送消息正常执行就commitTransaction批量生效出现异常就abortTransaction全部回滚保证多条消息要么全部成功、要么全部失败解决多消息写入的数据一致性问题生产者宕机重启后通过相同事务 ID 可跨会话承接上一次事务状态保证原子性不被破坏。