# CompletableFuture实践

# CompletableFuture相关的类图

有了类图能在全局层面上对CompletableFuture有概念了。

# 使用案例

20个使用案例 (opens new window)

# 创建一个完成的completedFuture

CompletableFuture cf = CompletableFuture.completedFuture("message");
assertTrue(cf.isDone());
assertEquals("message", cf.getNow(null));

# 执行一个异步任务

CompletableFuture cf = CompletableFuture.runAsync(() -> {
    assertTrue(Thread.currentThread().isDaemon());
    randomSleep();
});
assertFalse(cf.isDone());
sleepEnough();
assertTrue(cf.isDone());

# 在前一个阶段上应用函数

thenApply是同步的方法,会阻塞执行。

CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> {
    assertFalse(Thread.currentThread().isDaemon());
    return s.toUpperCase();
});
assertEquals("MESSAGE", cf.getNow(null));

# 在前一个阶段上异步应用函数

thenApplyAsync是利用ForkJoinPool来异步执行

CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
    assertTrue(Thread.currentThread().isDaemon());
    randomSleep();
    return s.toUpperCase();
});
assertNull(cf.getNow(null));
assertEquals("MESSAGE", cf.join());

thenApplyAsync支持自定义Executor来异步执行

CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
    assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
    assertFalse(Thread.currentThread().isDaemon());
    randomSleep();
    return s.toUpperCase();
}, executor);

assertNull(cf.getNow(null));
assertEquals("MESSAGE", cf.join());

# 消费前一阶段的结果

跟thenApply相比,没有返回值。

StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture("thenAccept message")
    .thenAccept(s -> result.append(s));
assertTrue("Result was empty", result.length() > 0);

thenAcceptAsync支持异步执行

StringBuilder result = new StringBuilder();
CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message")
    .thenAcceptAsync(s -> result.append(s));
cf.join();
assertTrue("Result was empty", result.length() > 0);

# 异常处理

public void test() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync((s) -> {
        sleepEnough();
        return s.toUpperCase();
    });
    // handle 处理异常,在th!=null时会返回对应内容
    CompletableFuture exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });
    // 若尚未complete,就调用get()等方法获取结果,便会执行当前方法,表示以异常的方式complete
    cf.completeExceptionally(new RuntimeException("completed exceptionally"));
    // 以异常的方式complete则返回true
    assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    try {
        // 阻塞式获取结果,由于异步方法中做了sleep,无法即时获取结果,因此属于异常场景
        cf.join();
        fail("Should have thrown an exception");
    } catch(CompletionException ex) { // just for testing
        // 检查是否与completeExceptionally()定义的效果一致
        assertEquals("completed exceptionally", ex.getCause().getMessage());
    }
    // 触发handle
    assertEquals("message upon cancel", exceptionHandler.join());
}

# 取消计算

public void test() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
    CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message");
    assertTrue("Was not canceled", cf.cancel(true));
    assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    assertEquals("canceled message", cf2.join());
}

# 在两个完成的阶段其中之一上做处理

应用函数

public void test() {
    String original = "Message";
    CompletableFuture cf1 = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s));
    CompletableFuture cf2 = cf1.applyToEither(
            CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
            s -> s + " from applyToEither");
    assertTrue(cf2.join().endsWith(" from applyToEither"));
}
private String delayedLowerCase(String s) {

调用消费函数

public void test() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture cf = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s))
            .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                    s -> result.append(s).append("acceptEither"));
    cf.join();
    assertTrue("Result was empty", result.toString().endsWith("acceptEither"));
}

# 在两个阶段都执行完后

运行一个Runnable

static void runAfterBothExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
            CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
            () -> result.append("done"));
    assertTrue("Result was empty", result.length() > 0);
}

使用BiConsumer处理两个阶段的结果

static void thenAcceptBothExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
            CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
            (s1, s2) -> result.append(s1 + s2));
    assertEquals("MESSAGEmessage", result.toString());
}

使用BiFunction处理两个阶段的结果

static void thenCombineExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
            .thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),
                    (s1, s2) -> s1 + s2);
    assertEquals("MESSAGEmessage", cf.getNow(null));
}

异步使用BiFunction处理两个阶段的结果

static void thenCombineAsyncExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s))
            .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                    (s1, s2) -> s1 + s2);
    assertEquals("MESSAGEmessage", cf.join());
}

组合 CompletableFuture

static void thenComposeExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
            .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s))
                    .thenApply(s -> upper + s));
    assertEquals("MESSAGEmessage", cf.join());
}

# 当几个阶段中的一个完成,创建一个完成的阶段

static void anyOfExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a", "b", "c");
    List<CompletableFuture> futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {
        if(th == null) {
            assertTrue(isUpperCase((String) res));
            result.append(res);
        }
    });
    assertTrue("Result was empty", result.length() > 0);
}

# 当所有的阶段都完成后创建一个阶段

同步

static void allOfExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a", "b", "c");
    List<CompletableFuture> futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {
        futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
        result.append("done");
    });
    assertTrue("Result was empty", result.length() > 0);
}

异步

static void allOfAsyncExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a", "b", "c");
    List<CompletableFuture> futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
            .whenComplete((v, th) -> {
                futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
                result.append("done");
            });
    allOf.join();
    assertTrue("Result was empty", result.length() > 0);
}

# CompletionService

请按到场顺序发言之CompletionService详解 (opens new window)

CompletionService使用案例:

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService pool = Executors.newFixedThreadPool(5);
    CompletionService<String> service = new ExecutorCompletionService<String>(pool);
    
  Stream.of("苹果", "梨", "葡萄", "桃")
            .forEach(fruit -> service.submit(() -> {
                        if(fruit.equals("苹果")){
                            TimeUnit.SECONDS.sleep(6);
                        }else if(fruit.equals("梨")){
                            TimeUnit.SECONDS.sleep(1);
                        }else if(fruit.equals("葡萄")){
                            TimeUnit.SECONDS.sleep(10);
                        }else if(fruit.equals("桃")){
                            TimeUnit.SECONDS.sleep(3);
                        }
                        return "洗干净的"+fruit;
                    })
            );

    String result;
    while((result=service.take().get())!=null){
        System.out.println("吃掉"+result);
    }
}
修改于: 8/11/2022, 3:17:56 PM