Java异步编程神器CompletableFuture

AI 概述
在当今高并发、分布式系统时代,异步编程是现代Java开发必备技能,Java 8的CompletableFuture可解决传统Future阻塞问题,支持任务组合。文章剖析其核心机制,包括状态机设计和依赖链存储机制,通过丰富代码示例展示任务创建、结果转换、任务组合、多任务协作、异常处理等操作。还分享了生产实践技巧,如线程池选择、超时控制、避免阻塞陷阱等。建议微服务架构中结合响应式框架使用,随着Java发展,其将与轻量级线程更好结合,持续发挥重要作用。
目录
文章目录隐藏
  1. 一、CompletableFuture 核心原理
  2. 二、CompletableFuture 关键操作实战
  3. 生产实践技巧与避坑指南
  4. 结语

Java 异步编程神器 CompletableFuture

在当今高并发、分布式系统盛行的时代,异步编程已成为现代 Java 开发的必备技能。Java 8 引入的CompletableFuture不仅解决了传统 Future 的阻塞问题,更提供了强大的任务组合能力,让我们能够以声明式的方式构建复杂的异步流程。

本文将深入剖析CompletableFuture的核心机制,并通过丰富的代码示例展示其实际应用场景,最后分享生产环境中的最佳实践。

一、CompletableFuture 核心原理

1.1 状态机设计

Java 异步编程神器 CompletableFuture

CompletableFuture 内部维护一个状态机,包含三种终态:

  • Completed:任务成功完成并包含结果;
  • Cancelled:任务被显式取消;
  • Exceptionally:任务执行过程中抛出异常。

1.2 依赖链存储机制

当多个操作链式组合时,CompletableFuture 使用栈结构存储依赖关系:

future.thenApply(func1)
      .thenApply(func2)
      .thenAccept(consumer);

执行流程:

  1. 原始任务完成时触发栈顶操作;
  2. 每个操作执行后生成新阶段;
  3. 新阶段完成后触发下一依赖;
  4. 异常沿调用链传播直到被捕获。

二、CompletableFuture 关键操作实战

任务创建

在使用 CompletableFuture 进行异步编程时,首先需要创建异步任务。CompletableFuture 提供了runAsyncsupplyAsync等静态方法来创建异步任务。runAsync用于执行没有返回值的异步任务,而supplyAsync用于执行有返回值的异步任务。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureCreateTask {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 使用默认线程池创建无返回值的异步任务
        CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> {
            System.out.println("runAsync task is running on thread: " + Thread.currentThread().getName());
            // 模拟任务执行
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("runAsync task completed");
        });
        runFuture.get();

        // 使用默认线程池创建有返回值的异步任务
        CompletableFuture<Integer> supplyFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("supplyAsync task is running on thread: " + Thread.currentThread().getName());
            // 模拟任务执行
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 42;
        });
        Integer result = supplyFuture.get();
        System.out.println("supplyAsync task result: " + result);


        // 使用自定义线程池创建有返回值的异步任务
        java.util.concurrent.ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(2);
        CompletableFuture<Integer> customSupplyFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("custom supplyAsync task is running on thread: " + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 100;
        }, executor);
        Integer customResult = customSupplyFuture.get();
        System.out.println("custom supplyAsync task result: " + customResult);
        executor.shutdown();
    }
}

在上述代码中,runAsync方法接受一个Runnable接口的实现,在异步线程中执行任务逻辑;supplyAsync方法接受一个Supplier接口的实现,返回一个CompletableFuture对象,通过get方法可以获取异步任务的执行结果。此外,我们还展示了如何使用自定义线程池来执行异步任务,通过CompletableFuture.supplyAsync(Supplier<T> supplier, Executor executor)方法,传入自定义的线程池executor,这样可以更好地控制线程资源的使用,避免线程池资源耗尽等问题。

结果转换

当异步任务执行完成后,我们常常需要对任务的结果进行转换处理。CompletableFuture 提供了thenApplythenApplyAsync方法来实现这一功能。thenApply方法会在任务完成后,同步地对结果进行转换;而thenApplyAsync方法则会异步地对结果进行转换,使用默认线程池或指定的线程池执行转换操作。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureTransformResult {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("Task 1 is running on thread: " + Thread.currentThread().getName());
            return 10;
        }).thenApply(result -> {
            System.out.println("thenApply in same thread: " + Thread.currentThread().getName());
            return result * 2;
        }).thenAccept(finalResult -> {
            System.out.println("Final result in same thread: " + finalResult + " on thread: " + Thread.currentThread().getName());
        });


        CompletableFuture.supplyAsync(() -> {
            System.out.println("Task 2 is running on thread: " + Thread.currentThread().getName());
            return 20;
        }).thenApplyAsync(result -> {
            System.out.println("thenApplyAsync in different thread: " + Thread.currentThread().getName());
            return result * 3;
        }).thenAcceptAsync(finalResult -> {
            System.out.println("Final result in different thread: " + finalResult + " on thread: " + Thread.currentThread().getName());
        });
    }
}

在这段代码中,第一个CompletableFuture通过supplyAsync创建异步任务,返回结果10,接着使用thenApply方法在同一线程中对结果进行乘法运算,将结果转换为20,最后通过thenAccept消费最终结果。第二个CompletableFuture同样通过supplyAsync创建任务,返回20,然后使用thenApplyAsync方法,在不同线程中对结果进行乘法运算,转换为60,最后通过thenAcceptAsync在不同线程中消费结果。这种结果转换的方式使得我们可以方便地对异步任务的结果进行后续处理,构建复杂的异步处理流程。

任务组合

在实际开发中,我们经常会遇到需要多个异步任务协同工作的场景,这时候就需要对任务进行组合编排。CompletableFuture 提供了thenComposethenCombine等方法来满足这一需求。thenCompose用于链式组合两个有依赖关系的任务,它会将前一个任务的结果作为参数传递给下一个返回CompletableFuture的函数,然后返回一个新的CompletableFuturethenCombine则用于并行组合两个CompletableFuture,它会等待两个任务都完成后,将它们的结果合并处理,返回一个新的CompletableFuture

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureTaskComposition {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // thenCompose 示例
        CompletableFuture.supplyAsync(() -> {
            System.out.println("Task A is running on thread: " + Thread.currentThread().getName());
            return 5;
        }).thenCompose(num -> CompletableFuture.supplyAsync(() -> {
            System.out.println("Task B (depends on A) is running on thread: " + Thread.currentThread().getName());
            return num * 10;
        })).thenAccept(result -> {
            System.out.println("Result of thenCompose: " + result + " on thread: " + Thread.currentThread().getName());
        });


        // thenCombine 示例
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task C is running on thread: " + Thread.currentThread().getName());
            return 3;
        });
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task D is running on thread: " + Thread.currentThread().getName());
            return 7;
        });
        future1.thenCombine(future2, (num1, num2) -> {
            System.out.println("Combining results on thread: " + Thread.currentThread().getName());
            return num1 + num2;
        }).thenAccept(result -> {
            System.out.println("Result of thenCombine: " + result + " on thread: " + Thread.currentThread().getName());
        });
    }
}

thenCompose的示例中,任务 A 先执行,返回结果5,然后任务 B 依赖任务 A 的结果,将其乘以10,最终得到结果50。在thenCombine的示例中,任务 C 和任务 D 并行执行,分别返回37,然后将这两个结果合并,相加得到结果10。通过这些任务组合方法,我们可以灵活地构建复杂的异步任务依赖关系和并行处理逻辑。

多任务协作

在处理多个异步任务时,我们有时需要等待所有任务都完成,或者只要有一个任务完成即可进行下一步操作。CompletableFuture 的allOfanyOf方法正好可以满足这些多任务协作的需求。allOf方法接受一组CompletableFuture,返回一个新的CompletableFuture,只有当所有传入的CompletableFuture都完成时,这个新的CompletableFuture才会完成;anyOf方法同样接受一组CompletableFuture,返回的新CompletableFuture会在任何一个传入的CompletableFuture完成时就完成,其结果是第一个完成的CompletableFuture的结果。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureMultiTaskCollaboration {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // allOf 示例
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        });
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 2;
        });
        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2);
        allOfFuture.join();
        System.out.println("All tasks are completed. Results: " + future1.join() + ", " + future2.join());


        // anyOf 示例
        CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 3;
        });
        CompletableFuture<Integer> future4 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 4;
        });
        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future3, future4);
        Object result = anyOfFuture.get();
        System.out.println("Any task is completed. Result: " + result);
    }
}

allOf示例中,future1future2并行执行,allOfFuture会等待这两个任务都完成后才会完成,通过join方法获取结果。在anyOf示例中,future3future4并行执行,anyOfFuture会在future3future4中任意一个完成时就完成,其结果是第一个完成的任务的结果,这里future4先完成,所以最终结果为4。这种多任务协作的方式在处理复杂异步业务场景时非常实用,可以有效提高系统的并发处理能力和响应速度。

异常处理

在异步任务执行过程中,难免会出现异常情况。CompletableFuture 提供了exceptionallyhandle方法来优雅地处理异常,实现优雅降级和统一结果处理。exceptionally方法用于捕获异步任务中的异常,并返回一个默认值或执行其他恢复操作;handle方法则更加强大,它无论任务是正常完成还是发生异常,都会执行,可以同时处理正常结果和异常情况。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExceptionHandling {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // exceptionally 示例
        CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                return 100;
            } else {
                throw new RuntimeException("Task failed");
            }
        }).exceptionally(ex -> {
            System.out.println("Caught exception: " + ex.getMessage());
            return -1;
        }).thenAccept(result -> {
            System.out.println("Result after exception handling: " + result);
        });


        // handle 示例
        CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                return 200;
            } else {
                throw new RuntimeException("Another task failed");
            }
        }).handle((result, ex) -> {
            if (ex != null) {
                System.out.println("Handle exception: " + ex.getMessage());
                return -2;
            } else {
                System.out.println("Handle normal result: " + result);
                return result;
            }
        }).thenAccept(finalResult -> {
            System.out.println("Final result after handle: " + finalResult);
        });
    }
}

exceptionally示例中,异步任务可能会抛出异常,如果抛出异常,exceptionally方法会捕获异常并返回-1,否则返回正常结果。在handle示例中,无论任务是否抛出异常,handle方法都会执行,通过判断ex是否为null来区分正常情况和异常情况,并进行相应的处理。这种灵活的异常处理机制可以确保我们的异步程序在遇到异常时依然能够保持健壮性和稳定性,不会因为一个任务的异常而导致整个系统崩溃 。

生产实践技巧与避坑指南

线程池的选择与优化

在使用 CompletableFuture 时,线程池的选择至关重要。对于 CPU 密集型任务,由于任务主要消耗 CPU 资源,线程在执行过程中很少会阻塞,此时线程池的核心线程数应设置为 CPU 核心数加 1,例如在一个拥有 4 核 CPU 的服务器上,核心线程数可以设置为 5,这样能充分利用 CPU 资源,同时避免因线程切换带来的额外开销。最大线程数也可设置为与核心线程数相同 ,任务队列可以选择有界队列,如ArrayBlockingQueue,防止过多任务积压导致内存消耗过大。当任务队列已满且线程数达到最大线程数时,采用抛出异常的拒绝策略,以确保重要任务能得到及时处理。

而对于 IO 密集型任务,由于任务大部分时间都在等待 I/O 操作完成,线程处于阻塞状态的时间较长,为了充分利用 CPU 资源,核心线程数可以设置为 CPU 核心数的 2 到 4 倍。例如在同样 4 核 CPU 的服务器上,核心线程数可以设置为 8 到 16 。最大线程数可以进一步增大,具体数值可根据系统负载和资源情况进行调整。任务队列适合使用无界队列,如LinkedBlockingQueue,以便能够处理大量的 I/O 请求。当线程池无法处理新任务时,拒绝策略可以选择调用者运行或丢弃最旧的任务,以应对高并发请求。避免使用默认线程池ForkJoinPool.commonPool(),因为它是一个公共的线程池,可能会被其他异步任务共享,导致资源竞争和性能不稳定。

超时控制

在实际生产环境中,为了防止异步任务长时间阻塞,设置合理的超时时间是非常必要的。CompletableFuture 提供了orTimeout方法来实现超时控制。我们可以在创建异步任务后,立即调用orTimeout方法设置超时时间。假设我们有一个查询数据库的异步任务,预计在 5 秒内完成,如果超过 5 秒还未完成,就可能是数据库出现了问题或者网络延迟过高,此时可以设置超时时间为 5 秒,当任务超过这个时间仍未完成时,orTimeout方法会抛出TimeoutException异常,我们可以通过exceptionally方法来捕获这个异常,并进行相应的处理,比如返回一个默认值或者记录错误日志,这样可以避免因任务长时间阻塞而导致系统响应变慢甚至无响应的情况。

避免阻塞陷阱

在异步线程中调用阻塞方法是一个常见的错误,这可能会导致异步编程的优势丧失,甚至引发死锁等严重问题。比如在使用 CompletableFuture 进行异步操作时,如果在thenApplythenAccept等回调方法中调用Thread.sleep或者一些同步的 I/O 操作(如读取大文件)等阻塞方法,会使当前异步线程被阻塞,无法及时执行后续的异步任务,从而影响整个系统的并发性能。正确的做法是尽量避免在异步线程中调用阻塞方法,如果确实需要进行一些耗时操作,可以将其封装成另一个异步任务,通过thenCompose方法来进行链式调用,thenCompose方法会等待前一个异步任务完成后,将其结果作为参数传递给下一个返回CompletableFuture的函数,从而实现异步操作的无缝衔接,避免阻塞问题。

异常处理

异常处理在 CompletableFuture 的生产实践中是不可忽视的环节。如果在异步任务链中没有正确处理异常,可能会导致异常被吞噬,使得程序在出现问题时难以排查和调试。例如,在一个包含多个thenApplythenAccept的链式调用中,如果某个任务抛出异常,而后续没有使用exceptionallyhandle方法进行捕获处理,异常会一直传播到链式调用的末尾,最终可能导致程序出现意想不到的错误。为了避免这种情况,我们应该在每个可能抛出异常的异步任务后面,及时使用exceptionally方法来捕获异常,并返回一个默认值或者执行一些恢复操作;或者使用handle方法,它可以同时处理正常结果和异常情况,根据不同的情况进行相应的处理,确保程序的健壮性和稳定性。

结语

什么时候选择 CompletableFuture?

场景 推荐方案
简单独立任务 ExecutorService + Future
复杂异步流水线 CompletableFuture
高并发响应式系统 Project Reactor/RxJava
CPU 密集型并行计算 Parallel Streams

核心优势总结

  1. 声明式任务组合:通过链式调用优雅组合异步任务;
  2. 非阻塞模型:最大化线程资源利用率;
  3. 灵活异常处理:提供多种异常恢复机制;
  4. 丰富 API 支持:满足各类异步编程需求;
  5. Java 生态集成:完美兼容 Stream、Optional 等特性。

最佳实践建议:在微服务架构中,将CompletableFutureSpring WebFluxReactive框架结合使用,可构建高性能响应式系统。同时,始终为耗时操作指定专用线程池,避免资源竞争。

随着 Java 21 虚拟线程的成熟,CompletableFuture将与轻量级线程更好结合,继续在异步编程领域发挥重要作用。

以上关于Java异步编程神器CompletableFuture的文章就介绍到这了,更多相关内容请搜索码云笔记以前的文章或继续浏览下面的相关文章,希望大家以后多多支持码云笔记。

「点点赞赏,手留余香」

0

给作者打赏,鼓励TA抓紧创作!

微信微信 支付宝支付宝

还没有人赞赏,快来当第一个赞赏的人吧!

声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 admin@mybj123.com 进行投诉反馈,一经查实,立即处理!
重要:如软件存在付费、会员、充值等,均属软件开发者或所属公司行为,与本站无关,网友需自行判断
码云笔记 » Java异步编程神器CompletableFuture

发表回复