Java并发-倒计时器CountDownLatch
CountDownLatch
是 Java 并发编程中的一个同步工具类,它的作用就像一个倒计时器,特别适合"等所有子任务完成后再执行主任务"的场景。比如说,你要启动一个服务,但需要先完成数据初始化、配置加载、连接池建立等多个准备工作,这时候 CountDownLatch
就派上用场了。
# 一、什么是 CountDownLatch
CountDownLatch
直译过来就是"倒计数闸门",它维护一个计数器,当计数器递减到 0 时,会释放所有等待的线程。这个过程是一次性的——计数器减到 0 之后就不能再重置了。
# 二、核心 API
CountDownLatch
的使用非常简单,主要包含一个构造函数和几个核心方法:
- CountDownLatch(int count): 构造函数,指定计数器的初始值
- countDown(): 计数器减 1,当计数到达 0 时,释放所有等待线程
- await(): 让当前线程等待,直到计数器变为 0
- await(long timeout, TimeUnit unit): 带超时的等待,避免无限期等待
# 三、基础示例:火箭发射倒计时
让我们通过一个火箭发射的例子来理解 CountDownLatch
的工作原理:
public class CountDownLatchDemo {
static final CountDownLatch LATCH = new CountDownLatch(10);
public static class Task implements Runnable {
private static final AtomicInteger counter = new AtomicInteger(1);
@Override
public void run() {
try {
// 模拟不同检测任务耗时不同
Thread.sleep(new Random().nextInt(3000) + 1000);
int taskNumber = counter.getAndIncrement();
System.out.println("火箭检测任务-" + taskNumber + " 完成,线程:" +
Thread.currentThread().getName());
// 完成一个任务,计数器减 1
LATCH.countDown();
System.out.println("剩余任务数:" + LATCH.getCount());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("任务被中断:" + e.getMessage());
}
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("开始火箭发射前检查,需要完成 10 项检测任务...");
ExecutorService executorService = Executors.newFixedThreadPool(10);
long startTime = System.currentTimeMillis();
// 提交所有检测任务
for (int i = 0; i < 10; i++) {
executorService.execute(new Task());
}
// 等待所有任务完成
System.out.println("等待所有检测任务完成...");
LATCH.await();
long endTime = System.currentTimeMillis();
System.out.println("🚀 所有检测完成!火箭发射!耗时:" + (endTime - startTime) + "ms");
executorService.shutdown();
}
}
运行这个例子,你会看到 10 个检测任务并发执行,只有当最后一个任务完成时,主线程才会继续执行"火箭发射"。
# 四、带超时的示例
有时候我们不能无限等待,需要设置超时机制:
public class CountDownLatchTimeoutDemo {
private static final CountDownLatch latch = new CountDownLatch(3);
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);
// 启动任务
for (int i = 1; i <= 3; i++) {
int taskId = i;
executor.submit(() -> {
try {
// 模拟第3个任务执行很久
if (taskId == 3) {
Thread.sleep(5000);
} else {
Thread.sleep(1000);
}
System.out.println("任务 " + taskId + " 完成");
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 等待最多3秒
boolean completed = latch.await(3, TimeUnit.SECONDS);
if (completed) {
System.out.println("所有任务都在规定时间内完成了");
} else {
System.out.println("有任务超时了,当前剩余任务数:" + latch.getCount());
}
executor.shutdownNow();
}
}
# 五、实际应用场景
# 1、服务启动等待
public class ServiceStartupDemo {
private final CountDownLatch startupLatch = new CountDownLatch(3);
public void startApplication() throws InterruptedException {
System.out.println("正在启动应用服务...");
// 异步初始化各个组件
CompletableFuture.runAsync(this::initDatabase);
CompletableFuture.runAsync(this::loadConfiguration);
CompletableFuture.runAsync(this::setupConnectionPool);
// 等待所有组件初始化完成
startupLatch.await();
System.out.println("应用启动完成,开始接受请求");
}
private void initDatabase() {
try {
Thread.sleep(2000); // 模拟数据库初始化
System.out.println("数据库初始化完成");
startupLatch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void loadConfiguration() {
try {
Thread.sleep(1500);
System.out.println("配置加载完成");
startupLatch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void setupConnectionPool() {
try {
Thread.sleep(1000);
System.out.println("连接池设置完成");
startupLatch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
# 2、并行计算汇总
public class ParallelComputeDemo {
public static void main(String[] args) throws InterruptedException {
int[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
int workerCount = 4;
CountDownLatch latch = new CountDownLatch(workerCount);
AtomicInteger totalSum = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(workerCount);
// 分片并行计算
int chunkSize = numbers.length / workerCount;
for (int i = 0; i < workerCount; i++) {
int startIndex = i * chunkSize;
int endIndex = (i == workerCount - 1) ? numbers.length : startIndex + chunkSize;
executor.submit(() -> {
int partialSum = 0;
for (int j = startIndex; j < endIndex; j++) {
partialSum += numbers[j];
}
totalSum.addAndGet(partialSum);
System.out.println("Worker " + Thread.currentThread().getName() +
" 计算完成,部分和:" + partialSum);
latch.countDown();
});
}
latch.await();
System.out.println("所有计算完成,总和:" + totalSum.get());
executor.shutdown();
}
}
# 六、使用注意事项
# 1、计数器无法重置
CountDownLatch
是一次性的,计数器到 0 后就不能再重复使用。如果需要重复使用,考虑使用 CyclicBarrier
。
# 2、避免死锁
确保 countDown()
一定会被调用,否则等待的线程会一直阻塞:
// 推荐的异常安全做法
public void doTask() {
try {
// 执行任务逻辑
performBusinessLogic();
} finally {
// 确保计数器一定会减少
latch.countDown();
}
}
# 3、合理设置超时
在生产环境中,建议使用带超时的 await()
方法,避免无限等待:
if (!latch.await(30, TimeUnit.SECONDS)) {
throw new TimeoutException("任务执行超时");
}
# 七、性能特点
- 内存开销小:
CountDownLatch
内部基于 AQS(AbstractQueuedSynchronizer)实现,内存占用很小 - 性能优秀:
countDown()
和await()
操作都是高效的 - 线程安全: 所有操作都是线程安全的,无需额外同步
# 八、总结
CountDownLatch
是一个简单而强大的同步工具,特别适合以下场景:
- 主线程等待多个子线程完成后再继续执行
- 确保某些资源在并发访问前已经准备就绪
- 实现多线程的"集合点",等所有线程都到达某个状态
它的使用非常简单,但要注意是一次性的,使用完就不能重复使用了。在实际开发中,记住合理设置超时和做好异常处理,就能很好地发挥它的作用。
编辑 (opens new window)
上次更新: 2025/08/14