當前位置: 妍妍網 > 碼農

介面響應慢?那是你沒用 CompletableFuture 來最佳化!

2024-03-15碼農

前言

大多數程式設計師在平時工作中,都是增刪改查。這裏我跟大家講解如何利用 CompletableFuture 最佳化計畫程式碼,使計畫效能更佳!

為什麽要用異步編程

舉個例子:使用者登入成功,需要返回前端使用者角色,選單許可權,個人資訊,使用者余額,積分情況等。正常邏輯是依次查詢不同表,得到對應的數據封裝返回給前端,程式碼如下:

@Test
public void login(Long userId){
log.info("開始查詢使用者全部資訊---序列!");
// 查詢使用者角色資訊
getUserRole(userId);
// 查詢使用者選單資訊
getUserMenu(userId);
// 查詢使用者余額資訊
getUserAmount(userId);
// 查詢使用者積分資訊
getUserIntegral(userId);
log.info("封裝使用者資訊返回給前端!");
}

假如查詢使用者角色,使用者選單,使用者余額,使用者積分分別耗時500,200,200,100毫秒,則登入介面耗時為1秒。如果采用異步(多執行緒並列)形式,則登入介面耗時以單個查詢最慢的任務為主,為查詢使用者角色資訊500毫秒。相當於登入介面效能提升一倍!查詢任務越多,則其效能提升越大!

程式碼演示(序列):

@Test
public void login() throws InterruptedException {
long startTime = System.currentTimeMillis();
log.info("開始查詢使用者全部資訊!");
log.info("開始查詢使用者角色資訊!");
Thread.sleep(500);
String role = "管理員";
log.info("開始查詢使用者選單資訊!");
Thread.sleep(200);
String menu = "首頁,帳戶管理,積分管理";
log.info("開始查詢查詢使用者余額資訊!");
Thread.sleep(200);
Integer amount = 1999;
log.info("開始查詢查詢查詢使用者積分資訊!");
Thread.sleep(100);
Integer integral = 1015;
log.info("封裝使用者資訊返回給前端!");
log.info("查詢使用者全部資訊總耗時:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
}

結果:

圖片

程式碼演示(異步):

@Test
public void asyncLogin() {
long startTime = System.currentTimeMillis();
log.info("開始查詢使用者角色資訊!");
CompletableFuture<Map<String, Object>> roleFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> roleMap = new HashMap<String, Object>();
roleMap.put("role""管理員");
long endTime = System.currentTimeMillis();
log.info("查詢使用者角色資訊耗時:" + (endTime - startTime) + "毫秒");
return roleMap;
});
log.info("開始查詢使用者選單資訊!");
CompletableFuture<Map<String, Object>> menuFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> menuMap = new HashMap<String, Object>();
menuMap.put("menu""首頁,帳戶管理,積分管理");
long endTime = System.currentTimeMillis();
log.info("查詢使用者選單資訊耗時:" + (endTime - startTime) + "毫秒");
return menuMap;
});
log.info("開始查詢使用者余額資訊!");
CompletableFuture<Map<String, Object>> amountFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> amountMap = new HashMap<String, Object>();
amountMap.put("amount", 1999);
long endTime = System.currentTimeMillis();
log.info("查詢使用者余額資訊耗時:" + (endTime - startTime) + "毫秒");
return amountMap;
});
log.info("開始查詢使用者積分資訊!");
CompletableFuture<Map<String, Object>> integralFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> integralMap = new HashMap<String, Object>();
integralMap.put("integral", 1015);
long endTime = System.currentTimeMillis();
log.info("查詢使用者積分資訊耗時:" + (endTime - startTime) + "毫秒");
return integralMap;
});
roleFuture.join();
menuFuture.join();
amountFuture.join();
integralFuture.join();
log.info("查詢使用者全部資訊總耗時:" + (System.currentTimeMillis() - startTime) + "毫秒");
}



結果:

圖片

直觀的可以看出,異步執行的優勢!

回顧Future

Future是什麽?

  • Java 1.5中引入Callable解決多執行緒執行無返回值的問題。

  • Future是為了配合Callable/Runnable而產生的。簡單來講,我們可以透過future來對任務查詢、取消、執行結果的獲取,是呼叫方與異步執行方之間溝通的橋梁。

  • FutureTask 實作了RunnableFuture介面,同時具有Runnable、Future的能力,即既可以作為Future得到Callable的返回值,又可以作為一個Runnable。

  • CompletableFuture實作了Futrue介面。

  • Future是Java5新加的一個介面,它提供了一種異步平行計算的功能。如果主執行緒需要執行一個很耗時的計算任務,我們可以將這個任務透過Future放到異步執行緒中去執行。主執行緒繼續處理其他任務,處理完成後,再透過Future獲取計算結果。

  • Future可以在連續流程中滿足數據驅動的並行需求,既獲得了並行執行的效能提升,又不失連續流程的簡潔優雅。

  • 程式碼演示(不使用自訂執行緒池):

    @Test
    public void callable() throws ExecutionException, InterruptedException {
    long startTime = System.currentTimeMillis();
    Callable amountCall = new Callable() {
    @Override
    public Object call() throws Exception {
    long startTime = System.currentTimeMillis();
    Thread.sleep(6000);
    Map<String, Object> amountMap = new HashMap<String, Object>();
    amountMap.put("amount", 99);
    long endTime = System.currentTimeMillis();
    log.info("查詢金額資訊耗時:" + (endTime - startTime) / 1000 + "秒");
    return amountMap;
    }
    };
    FutureTask<Map> amountFuture = new FutureTask<>(amountCall);
    new Thread(amountFuture).start();
    Callable roleCall = new Callable() {
    @Override
    public Object call() throws Exception {
    long startTime = System.currentTimeMillis();
    Thread.sleep(5000);
    Map<String, String> roleMap = new HashMap<String, String>();
    roleMap.put("name""管理員");
    long endTime = System.currentTimeMillis();
    log.info("查詢角色資訊耗時:" + (endTime - startTime) / 1000 + "秒");
    return roleMap;
    }
    };
    FutureTask<Map> roleFuture = new FutureTask<>(roleCall);
    new Thread(roleFuture).start();
    log.info("金額查詢結果為:" + amountFuture.get());
    log.info("角色查詢結果為:" + roleFuture.get());
    long endTime = System.currentTimeMillis();
    log.info("總耗時:" + (endTime - startTime) / 1000 + "秒");
    }





    圖片

    這裏要註意:Future對於結果的獲取,不是很友好,只能透過阻塞或者輪詢的方式得到任務的結果。 Future.get() 就是阻塞呼叫,線上程獲取結果之前get方法會一直阻塞;Future提供了一個isDone方法,可以在程式中輪詢這個方法查詢執行結果。

    這裏的 amountFuture.get() 如果放到如下圖所示的位置,則amountFuture下面的執行緒將等 amountFuture.get() 完成後才能執行,沒有執行完,則一直阻塞。

    圖片

    結果:

    圖片

    程式碼演示(使用自訂執行緒池):

    @Test
    public void executor() throws ExecutionException, InterruptedException {
    long startTime = System.currentTimeMillis();
    ExecutorService executor = Executors.newFixedThreadPool(2);
    Callable amountCall = new Callable() {
    @Override
    public Object call() throws Exception {
    long startTime = System.currentTimeMillis();
    Thread.sleep(6000);
    Map<String, Object> amountMap = new HashMap<String, Object>();
    amountMap.put("amount", 99);
    long endTime = System.currentTimeMillis();
    log.info("查詢金額資訊耗時:" + (endTime - startTime) / 1000 + "秒");
    return amountMap;
    }
    };
    Callable roleCall = new Callable() {
    @Override
    public Object call() throws Exception {
    long startTime = System.currentTimeMillis();
    Thread.sleep(5000);
    Map<String, String> roleMap = new HashMap<String, String>();
    roleMap.put("name""管理員");
    long endTime = System.currentTimeMillis();
    log.info("查詢使用者角色資訊耗時:" + (endTime - startTime) / 1000 + "秒");
    return roleMap;
    }
    };
    Future amountFuture = executor.submit(amountCall);
    Future roleFuture = executor.submit(roleCall);
    log.info("金額查詢結果為:" + amountFuture.get());
    log.info("角色查詢結果為:" + roleFuture.get());
    long endTime = System.currentTimeMillis();
    log.info("總耗時:" + (endTime - startTime) / 1000 + "秒");
    }





    結果:

    圖片

    CompletableFuture使用場景

    圖片

    建立異步任務

    圖片

    CompletableFuture建立異步任務,一般有supplyAsync和runAsync兩個方法:

  • supplyAsync執行CompletableFuture任務,支持返回值。

  • runAsync執行CompletableFuture任務,沒有返回值。

  • supplyAsync方法

    //使用預設內建執行緒池ForkJoinPool.commonPool(),根據supplier構建執行任務
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    //自訂執行緒,根據supplier構建執行任務
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

    runAsync方法

    //使用預設內建執行緒池ForkJoinPool.commonPool(),根據runnable構建執行任務
    public static CompletableFuture<Void> runAsync(Runnable runnable) 
    //自訂執行緒,根據runnable構建執行任務
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

    程式碼演示:

     @Test
    // supplyAsync執行CompletableFuture任務,支持返回值
    public void defaultSupplyAsync() throws ExecutionException, InterruptedException {
    long startTime = System.currentTimeMillis();
    // 構建執行任務
    CompletableFuture<Map<String, Object>> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
    try {
    Thread.sleep(6000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    Map<String, Object> amountMap = new HashMap<String, Object>();
    amountMap.put("amount", 99);
    long endTime = System.currentTimeMillis();
    log.info("查詢金額資訊耗時:" + (endTime - startTime) / 1000 + "秒");
    return amountMap;
    });
    // 這行程式碼在這裏 則會進行6秒的阻塞 下面程式碼其他執行緒無法建立
    // 只能等這個執行緒6秒過後結束才能建立其他執行緒
    // Map<String, Object> userMap = userCompletableFuture.get();
    CompletableFuture<Map<String, Object>> roleCompletableFuture = CompletableFuture.supplyAsync(() -> {
    try {
    Thread.sleep(5000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    Map<String, Object> roleMap = new HashMap<String, Object>();
    roleMap.put("name""管理員");
    return roleMap;
    });
    log.info("金額查詢結果為:" + amountCompletableFuture.join());
    log.info("角色查詢結果為:" + roleCompletableFuture.join());
    log.info("總耗時:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
    }
    @Test
    // supplyAsync執行CompletableFuture任務,支持返回值
    public void customSupplyAsync() throws ExecutionException, InterruptedException {
    // 自訂執行緒池
    ExecutorService executorService = Executors.newCachedThreadPool();
    long startTime = System.currentTimeMillis();
    CompletableFuture<Map<String, Object>> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
    try {
    Thread.sleep(6000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    Map<String, Object> amountMap = new HashMap<String, Object>();
    amountMap.put("amount", 99);
    long endTime = System.currentTimeMillis();
    log.info("查詢金額資訊耗時:" + (endTime - startTime) / 1000 + "秒");
    return amountMap;
    }, executorService);
    // 這行程式碼在這裏 則會進行6秒的阻塞 下面程式碼其他執行緒無法建立
    // 只能等這個執行緒6秒過後結束才能建立其他執行緒
    // Map<String, Object> userMap = userCompletableFuture.get();
    CompletableFuture<Map<String, Object>> roleCompletableFuture = CompletableFuture.supplyAsync(() -> {
    try {
    Thread.sleep(5000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    Map<String, Object> roleMap = new HashMap<String, Object>();
    roleMap.put("name""管理員");
    return roleMap;
    }, executorService);
    log.info("金額查詢結果為:" + amountCompletableFuture.join());
    log.info("角色查詢結果為:" + roleCompletableFuture.join());
    // 執行緒池需要關閉
    executorService.shutdown();
    log.info("總耗時:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
    }
     @Test
    // runAsync執行CompletableFuture任務,沒有返回值
    public void defaultRunAsync() {
    long lordStartTime = System.currentTimeMillis();
    CompletableFuture<Void> amountCompletableFuture = CompletableFuture.runAsync(() -> {
    long startTime = System.currentTimeMillis();
    try {
    Thread.sleep(3000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.info("執行金額增刪改操作用時:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
    });
    CompletableFuture<Void> roleCompletableFuture = CompletableFuture.runAsync(() -> {
    long startTime = System.currentTimeMillis();
    try {
    Thread.sleep(4000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.info("執行角色增刪改操作用時:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
    });
    log.info("金額查詢結果為:" + amountCompletableFuture.join());
    log.info("角色查詢結果為:" + roleCompletableFuture.join());
    log.info("總耗時:" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
    }
    @Test
    // runAsync執行CompletableFuture任務,沒有返回值
    public void customRunAsync() {
    long lordStartTime = System.currentTimeMillis();
    ExecutorService executor = Executors.newCachedThreadPool();
    CompletableFuture<Void> amountCompletableFuture = CompletableFuture.runAsync(() -> {
    long startTime = System.currentTimeMillis();
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.info("執行金額增刪改操作用時:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
    }, executor);
    CompletableFuture<Void> roleCompletableFuture = CompletableFuture.runAsync(() -> {
    long startTime = System.currentTimeMillis();
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.info("執行角色增刪改操作用時:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
    }, executor);
    log.info("金額查詢結果為:" + amountCompletableFuture.join());
    log.info("角色查詢結果為:" + roleCompletableFuture.join());
    // 關閉執行緒池
    executor.shutdown();
    log.info("總耗時:" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
    }











    註意:這裏的get()與join()都是獲取任務執行緒的返回值。join()方法丟擲的是uncheck異常(即RuntimeException),不會強制開發者丟擲, 會將異常包裝成 CompletionException 異常 / CancellationException 異常,但是本質原因還是程式碼記憶體在的真正的異常;

    get()方法丟擲的是經過檢查的異常, ExecutionException , InterruptedException 需要使用者手動處理(丟擲或者 try catch)。

    異步任務回呼

    圖片

    thenRun / thenRunAsync

    CompletableFuture的thenRun方法,通俗點講就是,做完第一個任務後,再做第二個任務。某個任務執行完成後,執行回呼方法;但是前後兩個任務沒有參數傳遞,第二個任務也沒有返回值。

    public CompletableFuture<Void> thenRun(Runnable action);
    public CompletableFuture<Void> thenRunAsync(Runnable action);

    thenRun / thenRunAsync的區別? 源碼解釋:

    private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    public CompletableFuture<Void> thenRun(Runnable action) {
    return uniRunStage(null, action);
    }
    public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(asyncPool, action);
    }

    如果你執行第一個任務的時候,傳入了一個自訂執行緒池:

  • 呼叫thenRun方法執行第二個任務時,則第二個任務和第一個任務是共用同一個執行緒池。

  • 呼叫thenRunAsync執行第二個任務時,則第一個任務使用的是你自己傳入的執行緒池,第二個任務使用的是ForkJoin執行緒池。

  • 後面介紹的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它們之間的區別也是這個哈!

    程式碼演示:

    @Test
    // 執行第一個任務後 可以繼續執行第二個任務 兩個任務之間無傳參 無返回值
    public void defaultThenRun() throws ExecutionException, InterruptedException {
    long lordStartTime = System.currentTimeMillis();
    CompletableFuture<Void> amountCompletableFuture = CompletableFuture.runAsync(() -> {
    long startTime = System.currentTimeMillis();
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.info("執行金額增刪改操作用時:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
    });
    CompletableFuture<Void> thenCompletableFuture = amountCompletableFuture.thenRun(() -> {
    long startTime = System.currentTimeMillis();
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.info("執行角色增刪改操作用時:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
    });
    thenCompletableFuture.get();
    log.info("總耗時:" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
    }

    結果:

    圖片

    thenAccept / thenAcceptAsync

    CompletableFuture的thenAccept方法表示,第一個任務執行完成後,執行第二個回呼方法任務,會將該任務的執行結果,作為入參,傳遞到回呼方法中,但是回呼方法是沒有返回值的。


    public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);

    程式碼演示:

    @Test
    // 執行第一個任務後 可以繼續執行第二個任務 並攜帶第一個任務的返回值 第二個任務執行完沒有返回值
    public void defaultThenAccept() throws ExecutionException, InterruptedException {
    long lordStartTime = System.currentTimeMillis();
    CompletableFuture<Map> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
    long startTime = System.currentTimeMillis();
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    Map<String, Object> amountMap = new HashMap<String, Object>();
    amountMap.put("amount", 90);
    log.info("執行金額查詢操作用時:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
    return amountMap;
    });
    CompletableFuture<Void> thenCompletableFuture = amountCompletableFuture.thenAccept((map) -> {
    long startTime = System.currentTimeMillis();
    if (Integer.parseInt(map.get("amount").toString()) > 90) {
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.info("金額充足,可以購買!:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
    else {
    log.info("金額不足,無法購買!:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
    }
    });
    thenCompletableFuture.get();
    log.info("總耗時:" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
    }

    結果:

    圖片

    thenApply / thenApplyAsync

    CompletableFuture的thenApply方法表示,第一個任務執行完成後,執行第二個回呼方法任務,會將該任務的執行結果,作為入參,傳遞到回呼方法中,並且回呼方法是有返回值的。

    public <U> CompletableFuture<U> thenApplyAsync();
    public CompletableFuture<Void> thenAccept(Consumer<? super T> action);

    程式碼演示:

    @Test
    // 執行第一個任務後 可以繼續執行第二個任務 並攜帶第一個任務的返回值 第二個任務執行完有返回值
    public void defaultThenApply() throws ExecutionException, InterruptedException {
    long lordStartTime = System.currentTimeMillis();
    CompletableFuture<Map> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
    long startTime = System.currentTimeMillis();
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    Map<String, Object> amountMap = new HashMap<String, Object>();
    amountMap.put("amount", 90);
    log.info("執行金額查詢操作用時:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
    return amountMap;
    });
    CompletableFuture<Integer> thenCompletableFuture = amountCompletableFuture.thenApply((map) -> {
    int number = 0;
    if (Integer.parseInt(map.get("amount").toString()) > 3) {
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    // 可口可樂3元一瓶 看金額一共能購買多少瓶
    number = Integer.parseInt(map.get("amount").toString()) / 3;
    }
    return number;
    });
    log.info("當前金額一共可以買" + thenCompletableFuture.get() + "瓶可口可樂!");
    Integer integer = thenCompletableFuture.get();
    log.info("總耗時:" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
    }

    結果:

    圖片

    exceptionally

    CompletableFuture的exceptionally方法表示,某個任務執行異常時,執行的回呼方法;並且有丟擲異常作為參數,傳遞到回呼方法。

    public CompletableFuture<T> exceptionally(
    Function<Throwable, ? extends T> fn) {
    return uniExceptionallyStage(fn);
    }

    程式碼演示:

     @Test
    // 某個任務執行異常時,執行的回呼方法;並且有丟擲異常作為參數,傳遞到回呼方法。
    public void exceptionally() throws ExecutionException, InterruptedException {
    long lordStartTime = System.currentTimeMillis();
    CompletableFuture<Map> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
    long startTime = System.currentTimeMillis();
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    Map<String, Object> amountMap = new HashMap<String, Object>();
    amountMap.put("amount", 90);
    log.info("執行金額查詢操作用時:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
    return amountMap;
    });
    CompletableFuture<Integer> thenCompletableFuture = amountCompletableFuture.thenApply((map) -> {
    int number = 0;
    if (Integer.parseInt(map.get("amount").toString()) > 3) {
    try {
    Thread.sleep(1000);
    // 可口可樂3元一瓶 看金額一共能購買多少瓶
    number = Integer.parseInt(map.get("amount").toString()) / 0;
    } catch (ArithmeticException | InterruptedException e) {
    e.printStackTrace();
    throw new ArithmeticException(); // 這裏一定要將異常拋除了,不然exceptionally無效
    }
    }
    return number;
    });
    CompletableFuture<Integer> exceptionFuture = thenCompletableFuture.exceptionally((e) -> {
    log.error("除數為0,則預設商為0!");
    return 0;
    });
    log.info("當前金額一共可以買" + thenCompletableFuture.get() + "瓶可口可樂!");
    exceptionFuture.get();
    log.info("總耗時:" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
    }

    圖片

    註意:這裏的異常一定要丟擲來,不然exceptionally無效!

    whenComplete

    CompletableFuture的whenComplete方法表示,某個任務執行完成後,執行的回呼方法,無返回值;並且whenComplete方法返回的CompletableFuture的result是上個任務的結果。

    public CompletableFuture<T> whenComplete(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(null, action);
    }

    程式碼演示:

    @Test
    // 某個任務執行完成後,執行的回呼方法,無返回值;並且whenComplete方法返回的CompletableFuture的result是上個任務的結果。
    public void whenComplete() {
    CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
    return"周杰倫";
    });
    CompletableFuture<String> stringCompletableFuture1 = stringCompletableFuture.whenComplete((a, throwable) -> {
    log.info("周杰倫喜歡唱");
    });
    log.info("輸出結果為第一個任務:" + stringCompletableFuture1.join());
    }

    結果:

    圖片

    handle

    CompletableFuture的handle方法表示,某個QQ帳號買號平台地圖任務執行完成後,執行回呼方法,並且是有返回值的;並且handle方法返回的CompletableFuture的result是回呼方法執行的結果。

    public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(null, fn);
    }

    程式碼演示:

    @Test
    // 某個任務執行完成後,執行的回呼方法,有返回值;並且handle方法返回的CompletableFuture的result是第二個任務的結果。
    public void handle() {
    CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
    return"周杰倫";
    });
    CompletableFuture<String> stringCompletableFuture1 = stringCompletableFuture.handle((a, throwable) -> {
    return"周杰倫喜歡唱歌!";
    });
    log.info("輸出結果為第二個任務:" + stringCompletableFuture1.join());
    }

    結果:

    圖片

    多個任務組合處理

    圖片

    AND組合關系

    thenCombine / thenAcceptBoth / runAfterBoth都表示:將兩個CompletableFuture組合起來,只有這兩個都正常執行完了,才會執行某個任務。

  • thenCombine:會將兩個任務的執行結果作為方法入參,傳遞到指定方法中,且有返回值。

  • thenAcceptBoth: 會將兩個任務的執行結果作為方法入參,傳遞到指定方法中,且無返回值。

  • runAfterBoth 不會把執行結果當做方法入參,且沒有返回值。

  • 程式碼演示:

    @Test
    public void thenCombine() {
    CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> {
    return 7;
    });
    CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 2).thenCombine(first, Integer::sum);
    log.info("結果為:" + second.join());
    }

    結果為:

    圖片

    OR組合關系

    applyToEither / acceptEither / runAfterEither 都表示:將兩個CompletableFuture組合起來,只要其中一個執行完了,就會執行某個任務。

  • applyToEither:會將已經執行完成的任務,作為方法入參,傳遞到指定方法中,且有返回值。

  • acceptEither: 會將已經執行完成的任務,作為方法入參,傳遞到指定方法中,且無返回值。

  • runAfterEither:不會把執行結果當做方法入參,且沒有返回值。

  • 程式碼演示:

    @Test
    public void applyToEither1() {
    log.info("魏凱下班準備回家。。。");
    log.info("魏凱等待2號,4號地鐵。。。");
    CompletableFuture<String> busCF = CompletableFuture.supplyAsync(() -> {
    log.info("2號在路上。。。");
    try {
    Thread.sleep(3000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return"2";
    }).applyToEither(CompletableFuture.supplyAsync(() -> {
    log.info("4號地鐵在路上。。。");
    try {
    Thread.sleep(4000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return"4";
    }), first -> first + "號");
    log.info("魏凱坐上" + busCF.join() + "地鐵");
    }
    @Test
    // OR
    public void applyToEither() {
    CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> {
    try {
    Thread.sleep(2000L);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return 7;
    });
    CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> {
    try {
    Thread.sleep(3000L);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return 7;
    }).applyToEither(first, num -> num);
    log.info("最後結果為:" + second.join());
    }

    結果演示:

    圖片

    AllOf

    所有任務都執行完成後,才執行 allOf返回的CompletableFuture。如果任意一個任務異常,allOf的CompletableFuture,執行get方法,會丟擲異常。

    程式碼演示:

    @Test
    // 所有任務都執行完成後,才執行 allOf返回的CompletableFuture。如果任意一個任務異常,allOf的CompletableFuture,執行get方法,會丟擲異常。
    // 這裏第一次執行沒有睡眠的話,是可以直接執行第三個任務的。如果有睡眠,則需要手動join啟動。
    public void allOf() {
    CompletableFuture<Void> first = CompletableFuture.runAsync(() -> {
    // try {
    // Thread.sleep(2000);
    // } catch (InterruptedException e) {
    // e.printStackTrace();
    // }
    log.info("第一個任務執行完成!");
    });
    CompletableFuture<Void> second = CompletableFuture.runAsync(() -> {
    // try {
    // Thread.sleep(500);
    // } catch (InterruptedException e) {
    // e.printStackTrace();
    // }
    log.info("第二個任務執行完成!");
    });
    CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(first, second).whenComplete((m, n) -> {
    log.info("第三個任務完成!");
    });
    // voidCompletableFuture.join();
    }

    結果:

    圖片

    註意:這裏第一次啟動執行沒有睡眠的話,是可以直接執行第三個任務的,因為這兩個任務都執行完成,啟動的瞬間第三個也同時執行完。如果有睡眠,則需要手動join啟動,等待最長睡眠任務時間過後,第三個任務完成!

    AnyOf

    任意一個任務執行完,就執行anyOf返回的CompletableFuture。如果執行的任務異常,anyOf的CompletableFuture,執行get方法,會丟擲異常。

    程式碼演示:

    @Test
    // 前提任務任意執行完一個,則目標任務執行。其他前提任務則不在執行。
    // 任意一個任務執行完,就執行anyOf返回的CompletableFuture。如果執行的任務異常,anyOf的CompletableFuture,執行get方法,會丟擲異常。
    public void anyOf() {
    CompletableFuture<Void> first = CompletableFuture.runAsync(() -> {
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.info("第一個任務執行完成!");
    });
    CompletableFuture<Void> second = CompletableFuture.runAsync(() -> {
    try {
    Thread.sleep(500);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.info("第二個任務執行完成!");
    });
    CompletableFuture<Object> voidCompletableFuture = CompletableFuture.anyOf(first, second).whenComplete((m, n) -> {
    log.info("第三個任務完成!");
    });
    voidCompletableFuture.join();
    }

    結果:

    圖片

    thenCompose

    thenCompose方法會在某個任務執行完成後,將該任務的執行結果,作為方法入參,去執行指定的方法。該方法會返回一個新的CompletableFuture例項。

  • 如果該CompletableFuture例項的result不為null,則返回一個基於該result新的CompletableFuture例項。

  • 如果該CompletableFuture例項為null,然後就執行這個新任務。

  • 程式碼演示:

    @Test
    public void thenCompose1() {
    CompletableFuture<Integer> stringCompletableFuture = CompletableFuture.supplyAsync(() -> 4)
    .thenCompose(value -> CompletableFuture.supplyAsync(() -> {
    // thenCompose方法返回一個新的CompletableFuture
    if (Integer.valueOf(4).equals(value)) {
    return 66;
    else {
    return 99;
    }
    }));
    log.info("結果:" + stringCompletableFuture.join());
    }
    @Test
    public void thenCompose() {
    CompletableFuture<String> first = CompletableFuture.completedFuture("第一個任務");
    CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> 4)
    .thenCompose((data) -> {
    log.info("data為:" + data);
    return first;
    });
    log.info("結果:" + stringCompletableFuture.join());
    }

    結果:

    圖片

    CompletableFuture註意點

    圖片

    Future需要獲取返回值,才能獲取異常資訊

    @Test
    public void futureTest(){
    ExecutorService executor = Executors.newFixedThreadPool(2);
    CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
    int m = 9;
    int n = 0;
    return m / n;
    },executor);
    // integerCompletableFuture.join(); // 這行程式碼不加,則不會丟擲異常
    }

    Future需要獲取返回值,才能獲取到異常資訊。如果不加 get()/join()方法,看不到異常資訊。小夥伴們使用的時候,註意一下哈,考慮是否加try…catch…或者使用exceptionally方法。

    CompletableFuture的get()方法是阻塞的

    //反例
     CompletableFuture.get();
    //正例
    CompletableFuture.get(9, TimeUnit.SECONDS);

    CompletableFuture的get()方法是阻塞的,如果使用它來獲取異步呼叫的返回值,需要添加超時時間

    預設執行緒池的註意點

    CompletableFuture程式碼中又使用了預設的執行緒池,處理的執行緒個數是電腦CPU核數-1。在大量請求過來的時候,處理邏輯復雜的話,響應會很慢。一般建議使用自訂執行緒池,最佳化執行緒池配置參數。

    自訂執行緒池時,註意飽和策略

    CompletableFuture的get()方法是阻塞的,我們一般建議使用 future.get(3, TimeUnit.SECONDS) 。並且一般建議使用自訂執行緒池。但是如果執行緒池拒絕策略是DiscardPolicy或者DiscardOldestPolicy,當執行緒池飽和時,會直接丟棄任務,不會拋棄異常。

    因此建議,CompletableFuture執行緒池策略最好使用AbortPolicy,然後耗時的異步執行緒,做好執行緒池隔離!

    <END>