java 线程池 + 分批次批量插入代码
线程池 分批次批量插入代码import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; /** * 批量导入工具类 * 线程池分批插入 安全等待 */ Component public class UserBatchInsert { Resource private UserMapper userMapper; // 核心线程数根据服务器CPU核心数设置推荐 CPU核心数*2 private static final int CORE_THREAD_NUM 8; // 线程池拒绝策略调用者线程执行保证任务不丢失 private static final RejectedExecutionHandler HANDLER new ThreadPoolExecutor.CallerRunsPolicy(); /** * 线程池分批批量插入 * param data 总数据集合 * param batchNum 每批次插入条数推荐 500~1000 * throws InterruptedException 中断异常 */ public void batchInsertUser(ListUser data, int batchNum) throws InterruptedException { // 1. 参数校验 if (data null || data.isEmpty()) { return; } if (batchNum 0) { batchNum 500; // 默认批次 } int totalSize data.size(); // 计算总批次 int batchCount (totalSize batchNum - 1) / batchNum; // 自定义线程池固定核心线程拒绝OOM ThreadPoolExecutor executor new ThreadPoolExecutor( CORE_THREAD_NUM, CORE_THREAD_NUM, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), HANDLER ); CountDownLatch countDownLatch new CountDownLatch(batchCount); try { for (int i 0; i batchCount; i) { int start i * batchNum; int end Math.min(start batchNum, totalSize); // 关键subList 转为新集合避免并发异常 ListUser subList new ArrayList(data.subList(start, end)); // 提交任务 executor.execute(new InsertTask(subList, countDownLatch)); } // 等待所有批次插入完成 countDownLatch.await(); } finally { // 优雅关闭线程池 executor.shutdown(); if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { executor.shutdownNow(); } } } /** * 静态内部类插入任务 * 必须加 static避免外部类引用泄漏 */ static class InsertTask implements Runnable { private final ListUser userList; private final CountDownLatch countDownLatch; public InsertTask(ListUser userList, CountDownLatch countDownLatch) { this.userList userList; this.countDownLatch countDownLatch; } Override public void run() { try { // 执行业务批量插入 if (userList ! null !userList.isEmpty()) { // 调用MyBatis批量插入 userList.forEach(System.out::println); // userMapper.insertUserList(userList); } } catch (Exception e) { // 捕获异常避免任务失败导致计数器不执行 e.printStackTrace(); } finally { // 必须放在finally无论成功/失败都计数-1 countDownLatch.countDown(); } } } }Mapper 批量插入接口import org.apache.ibatis.annotations.Param; import java.util.List; public interface UserMapper { /** * 批量新增用户 */ void insertUserList(Param(list) ListUser list); }XML SQLMyBatisinsert idinsertUserList INSERT INTO user(id,name,phone) VALUES foreach collectionlist itemitem separator, (#{item.id},#{item.name},#{item.phone}) /foreach /insert使用示例Autowired private UserBatchInsert userBatchInsert; public void test() throws InterruptedException { // 模拟10000条数据 ListUser userList new ArrayList(); // 每500条一批多线程插入 userBatchInsert.batchInsertUser(userList, 500); }重要生产提醒批次大小推荐500~1000 条 / 批性能最优事务问题多线程插入无法共用一个事务每批次是独立事务适用场景大量数据初始化、导入、同步