# CompletableFuture实践
# CompletableFuture相关的类图
有了类图能在全局层面上对CompletableFuture有概念了。
# 使用案例
# 创建一个完成的completedFuture
CompletableFuture cf = CompletableFuture.completedFuture("message");
assertTrue(cf.isDone());
assertEquals("message", cf.getNow(null));
# 执行一个异步任务
CompletableFuture cf = CompletableFuture.runAsync(() -> {
assertTrue(Thread.currentThread().isDaemon());
randomSleep();
});
assertFalse(cf.isDone());
sleepEnough();
assertTrue(cf.isDone());
# 在前一个阶段上应用函数
thenApply是同步的方法,会阻塞执行。
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> {
assertFalse(Thread.currentThread().isDaemon());
return s.toUpperCase();
});
assertEquals("MESSAGE", cf.getNow(null));
# 在前一个阶段上异步应用函数
thenApplyAsync是利用ForkJoinPool来异步执行
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertTrue(Thread.currentThread().isDaemon());
randomSleep();
return s.toUpperCase();
});
assertNull(cf.getNow(null));
assertEquals("MESSAGE", cf.join());
thenApplyAsync支持自定义Executor来异步执行
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
assertFalse(Thread.currentThread().isDaemon());
randomSleep();
return s.toUpperCase();
}, executor);
assertNull(cf.getNow(null));
assertEquals("MESSAGE", cf.join());
# 消费前一阶段的结果
跟thenApply相比,没有返回值。
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture("thenAccept message")
.thenAccept(s -> result.append(s));
assertTrue("Result was empty", result.length() > 0);
thenAcceptAsync支持异步执行
StringBuilder result = new StringBuilder();
CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message")
.thenAcceptAsync(s -> result.append(s));
cf.join();
assertTrue("Result was empty", result.length() > 0);
# 异常处理
public void test() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync((s) -> {
sleepEnough();
return s.toUpperCase();
});
// handle 处理异常,在th!=null时会返回对应内容
CompletableFuture exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });
// 若尚未complete,就调用get()等方法获取结果,便会执行当前方法,表示以异常的方式complete
cf.completeExceptionally(new RuntimeException("completed exceptionally"));
// 以异常的方式complete则返回true
assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
try {
// 阻塞式获取结果,由于异步方法中做了sleep,无法即时获取结果,因此属于异常场景
cf.join();
fail("Should have thrown an exception");
} catch(CompletionException ex) { // just for testing
// 检查是否与completeExceptionally()定义的效果一致
assertEquals("completed exceptionally", ex.getCause().getMessage());
}
// 触发handle
assertEquals("message upon cancel", exceptionHandler.join());
}
# 取消计算
public void test() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message");
assertTrue("Was not canceled", cf.cancel(true));
assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
assertEquals("canceled message", cf2.join());
}
# 在两个完成的阶段其中之一上做处理
应用函数
public void test() {
String original = "Message";
CompletableFuture cf1 = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s));
CompletableFuture cf2 = cf1.applyToEither(
CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
s -> s + " from applyToEither");
assertTrue(cf2.join().endsWith(" from applyToEither"));
}
private String delayedLowerCase(String s) {
调用消费函数
public void test() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s))
.acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
s -> result.append(s).append("acceptEither"));
cf.join();
assertTrue("Result was empty", result.toString().endsWith("acceptEither"));
}
# 在两个阶段都执行完后
运行一个Runnable
static void runAfterBothExample() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
() -> result.append("done"));
assertTrue("Result was empty", result.length() > 0);
}
使用BiConsumer处理两个阶段的结果
static void thenAcceptBothExample() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
(s1, s2) -> result.append(s1 + s2));
assertEquals("MESSAGEmessage", result.toString());
}
使用BiFunction处理两个阶段的结果
static void thenCombineExample() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
.thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),
(s1, s2) -> s1 + s2);
assertEquals("MESSAGEmessage", cf.getNow(null));
}
异步使用BiFunction处理两个阶段的结果
static void thenCombineAsyncExample() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s))
.thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
(s1, s2) -> s1 + s2);
assertEquals("MESSAGEmessage", cf.join());
}
组合 CompletableFuture
static void thenComposeExample() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
.thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s))
.thenApply(s -> upper + s));
assertEquals("MESSAGEmessage", cf.join());
}
# 当几个阶段中的一个完成,创建一个完成的阶段
static void anyOfExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a", "b", "c");
List<CompletableFuture> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {
if(th == null) {
assertTrue(isUpperCase((String) res));
result.append(res);
}
});
assertTrue("Result was empty", result.length() > 0);
}
# 当所有的阶段都完成后创建一个阶段
同步
static void allOfExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a", "b", "c");
List<CompletableFuture> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
assertTrue("Result was empty", result.length() > 0);
}
异步
static void allOfAsyncExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a", "b", "c");
List<CompletableFuture> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
allOf.join();
assertTrue("Result was empty", result.length() > 0);
}
# CompletionService
请按到场顺序发言之CompletionService详解 (opens new window)
CompletionService使用案例:
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletionService<String> service = new ExecutorCompletionService<String>(pool);
Stream.of("苹果", "梨", "葡萄", "桃")
.forEach(fruit -> service.submit(() -> {
if(fruit.equals("苹果")){
TimeUnit.SECONDS.sleep(6);
}else if(fruit.equals("梨")){
TimeUnit.SECONDS.sleep(1);
}else if(fruit.equals("葡萄")){
TimeUnit.SECONDS.sleep(10);
}else if(fruit.equals("桃")){
TimeUnit.SECONDS.sleep(3);
}
return "洗干净的"+fruit;
})
);
String result;
while((result=service.take().get())!=null){
System.out.println("吃掉"+result);
}
}