简化Java异步任务同步的利器LatchUtils
在 Java 应用开发中,为了提升系统性能和响应速度,我们经常需要将一些耗时操作(如调用外部 API、查询数据库、复杂计算等)进行异步并行处理。当主流程需要等待所有这些并行任务执行完毕后再继续时,我们通常会用到ExecutorService 、CountDownLatch 等并发工具。
然而,直接使用这些原生工具,往往意味着需要编写一些重复的、模式化的“胶水代码”,这不仅增加了代码量,也让核心业务逻辑显得不够清晰。
为了解决这个问题,我封装了一个名为LatchUtils的轻量级工具类。它能够以一种极其简洁的方式来组织和管理这一类异步任务。
详细代码
其代码如下,后面会有使用说明和示例以及和传统实现代码的对比
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
publicclass LatchUtils {
privatestaticfinal ThreadLocal<List<TaskInfo>> THREADLOCAL = ThreadLocal.withInitial(LinkedList::new);
public static void submitTask(Executor executor, Runnable runnable) {
THREADLOCAL.get().add(new TaskInfo(executor, runnable));
}
private static List<TaskInfo> popTask() {
List<TaskInfo> taskInfos = THREADLOCAL.get();
THREADLOCAL.remove();
return taskInfos;
}
public static boolean waitFor(long timeout, TimeUnit timeUnit) {
List<TaskInfo> taskInfos = popTask();
if (taskInfos.isEmpty()) {
returntrue;
}
CountDownLatch latch = new CountDownLatch(taskInfos.size());
for (TaskInfo taskInfo : taskInfos) {
Executor executor = taskInfo.executor;
Runnable runnable = taskInfo.runnable;
executor.execute(() -> {
try {
runnable.run();
} finally {
latch.countDown();
}
});
}
boolean await = false;
try {
await = latch.await(timeout, timeUnit);
} catch (Exception ignored) {
}
return await;
}
privatestaticfinalclass TaskInfo {
privatefinal Executor executor;
privatefinal Runnable runnable;
public TaskInfo(Executor executor, Runnable runnable) {
this.executor = executor;
this.runnable = runnable;
}
}
}
核心思想
LatchUtils 的设计哲学是:多次提交,一次等待。
- 任务注册: 在主流程代码中,可以先通过
LatchUtils.submitTask()提交 Runnable 任务和其对应的 Executor(该线程池用来执行这个 Runnable)。 - 执行并等待:当并行任务都提交完毕后,你只需调用一次
LatchUtils.waitFor()。该方法会立即触发所有已注册任务的执行,并阻塞等待所有任务执行完成或超时。
API 概览
这个工具类对外暴露的接口极其简单,只有两个核心静态方法:
submitTask()
public static void submitTask(Executor executor, Runnable runnable)
功能:提交一个异步任务。
参数:
- executor:
java.util.concurrent.Executor– 指定执行此任务的线程池。 - runnable:
java.lang.Runnable– 需要异步执行的具体业务逻辑。
waitFor()
public static boolean waitFor(long timeout, TimeUnit timeUnit)
功能:触发所有已提交任务的执行,并同步等待它们全部完成。
参数:
- timeout:
long– 最长等待时间。 - timeUnit:
java.util.concurrent.TimeUnit– 等待时间单位。
返回值:
- true:如果所有任务在指定时间内成功完成。
- false:如果等待超时。
注意:该方法在执行后会自动清理当前线程提交的任务列表,因此可以重复使用。
实战示例
让我们来看一个典型的应用场景:一个聚合服务需要同时调用用户服务、订单服务和商品服务,拿到所有结果后再进行下一步处理。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
publicclass Main {
public static void main(String[] args) {
// 1. 准备一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
System.out.println("主流程开始,准备分发异步任务...");
// 2. 提交多个异步任务
// 任务一:获取用户信息
LatchUtils.submitTask(executorService, () -> {
try {
System.out.println("开始获取用户信息...");
Thread.sleep(1000); // 模拟耗时
System.out.println("获取用户信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 任务二:获取订单信息
LatchUtils.submitTask(executorService, () -> {
try {
System.out.println("开始获取订单信息...");
Thread.sleep(1500); // 模拟耗时
System.out.println("获取订单信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 任务三:获取商品信息
LatchUtils.submitTask(executorService, () -> {
try {
System.out.println("开始获取商品信息...");
Thread.sleep(500); // 模拟耗时
System.out.println("获取商品信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
System.out.println("所有异步任务已提交,主线程开始等待...");
// 3. 等待所有任务完成,最长等待 5 秒
boolean allTasksCompleted = LatchUtils.waitFor(5, TimeUnit.SECONDS);
// 4. 根据等待结果继续主流程
if (allTasksCompleted) {
System.out.println("所有异步任务执行成功,主流程继续...");
} else {
System.err.println("有任务执行超时,主流程中断!");
}
// 5. 关闭线程池
executorService.shutdown();
}
}
输出结果:
主流程开始,准备分发异步任务... 所有异步任务已提交,主线程开始等待... 开始获取商品信息... 开始获取用户信息... 开始获取订单信息... 获取商品信息成功! 获取用户信息成功! 获取订单信息成功! 所有异步任务执行成功,主流程继续...
从这个例子中可以看到,业务代码变得非常清晰。我们只需要关注“提交任务”和“等待结果”这两个动作,而无需关心 CountDownLatch的初始化、 countDown() 的调用以及异常处理等细节。
对比:如果不使用 LatchUtils
为了更好地理解LatchUtils带来的价值,让我们看看要实现与上面完全相同的功能,用传统的 Java 并发 API 需要如何编写代码。 通常有两种主流方式:使用CountDownLatch或使用CompletableFuture 。
方式一:直接使用 CountDownLatch
这是最经典的方式,开发者需要手动管理 CountDownLatch 的生命周期。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
publicclass ManualCountDownLatchExample {
public static void main(String[] args) {
// 1. 准备一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 2. 手动初始化 CountDownLatch,数量为任务数
CountDownLatch latch = new CountDownLatch(3);
System.out.println("主流程开始,准备分发异步任务...");
// 3. 提交任务,并在每个任务的 finally 块中手动调用 latch.countDown()
// 任务一:获取用户信息
executorService.execute(() -> {
try {
System.out.println("开始获取用户信息...");
Thread.sleep(1000);
System.out.println("获取用户信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 手动减一
}
});
// 任务二:获取订单信息
executorService.execute(() -> {
try {
System.out.println("开始获取订单信息...");
Thread.sleep(1500);
System.out.println("获取订单信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 手动减一
}
});
// 任务三:获取商品信息
executorService.execute(() -> {
try {
System.out.println("开始获取商品信息...");
Thread.sleep(500);
System.out.println("获取商品信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 手动减一
}
});
System.out.println("所有异步任务已提交,主线程开始等待...");
// 4. 手动调用 latch.await() 进行等待
boolean allTasksCompleted = false;
try {
allTasksCompleted = latch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// 需要处理中断异常
Thread.currentThread().interrupt();
System.err.println("主线程在等待时被中断!");
}
// 5. 根据等待结果继续主流程
if (allTasksCompleted) {
System.out.println("所有异步任务执行成功,主流程继续...");
} else {
System.err.println("有任务执行超时,主流程中断!");
}
// 6. 关闭线程池
executorService.shutdown();
}
}
方式二:使用 CompletableFuture
使用CompletableFuture实现,其代码如下
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
publicclass CompletableFutureExample {
public static void main(String[] args) {
// 1. 准备一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
System.out.println("主流程开始,准备分发异步任务...");
// 2. 创建 CompletableFuture 任务
CompletableFuture<Void> userFuture = CompletableFuture.runAsync(() -> {
try {
System.out.println("开始获取用户信息...");
Thread.sleep(1000);
System.out.println("获取用户信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executorService);
CompletableFuture<Void> orderFuture = CompletableFuture.runAsync(() -> {
try {
System.out.println("开始获取订单信息...");
Thread.sleep(1500);
System.out.println("获取订单信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executorService);
CompletableFuture<Void> productFuture = CompletableFuture.runAsync(() -> {
try {
System.out.println("开始获取商品信息...");
Thread.sleep(500);
System.out.println("获取商品信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executorService);
System.out.println("所有异步任务已提交,主线程开始等待...");
// 3. 使用 CompletableFuture.allOf 将所有任务组合起来
CompletableFuture<Void> allFutures = CompletableFuture.allOf(userFuture, orderFuture, productFuture);
// 4. 等待组合后的 Future 完成
try {
allFutures.get(5, TimeUnit.SECONDS);
System.out.println("所有异步任务执行成功,主流程继续...");
} catch (Exception e) {
// 需要处理多种异常,如 InterruptedException, ExecutionException, TimeoutException
System.err.println("任务执行超时或出错,主流程中断! " + e.getMessage());
}
// 5. 关闭线程池
executorService.shutdown();
}
}
对比分析

结论很明显:
对于“分发一组并行任务,然后等待它们全部完成”这一特定但常见的模式,LatchUtils 通过适度的封装,极大地简化了开发者的工作。它隐藏了并发控制的复杂性,让业务代码回归其本质,从而提高了代码的可读性和可维护性。
以上关于简化Java异步任务同步的利器LatchUtils的文章就介绍到这了,更多相关内容请搜索码云笔记以前的文章或继续浏览下面的相关文章,希望大家以后多多支持码云笔记。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 admin@mybj123.com 进行投诉反馈,一经查实,立即处理!
重要:如软件存在付费、会员、充值等,均属软件开发者或所属公司行为,与本站无关,网友需自行判断
码云笔记 » 简化Java异步任务同步的利器LatchUtils
微信
支付宝