一、 CompletableFuture介绍
多线程开发一般使用Runnable,Callable,Thread,FutureTask,ThreadPoolExecutor,但也有不近如意的地方
Thread + Runnable:执行异步任务,没有返回结果。
Thread + Callable + FutureTask:执行一步任务,有返回结果。
获取返回结果,基于get方法获取,线程需要挂起在WaitNode里。
获取返回结果,基于isDone判断任务的状态,但是这里需要不断轮询。
上述的方式都是有一定的局限性的
CompletableFuture是Java 8中引入的一种实现异步编程模型的方式,它是Future的扩展,提供了更强大、更灵活的功能。CompletableFuture可以表示两个异步任务之间的顺序关系或并行关系,同时提供了一些方便的工具方法来组合这些关系。
二、 应用
首先对CompletableFuture提供的函数式编程中三个函数有一个掌握
Supplier 生产者,没有入参,有返回结果
Consumer 消费者,有入参,但是没有返回结果
Function<T,U> 函数,有入参,又有返回结果
2.1 功能
并行执行
allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture 。
anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFutur。
依赖关系
thenApply():把前面任务的执行结果,交给后面的Function。
thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回。
or聚合关系
applyToEither():两个任务哪个执行的快,就使用哪一个结果,有返回值 。
acceptEither():两个任务哪个执行的快,就消费哪一个结果,无返回值 。
runAfterEither():任意一个任务执行完成,进行下一步操作(Runnable类型任务。
and集合关系
thenCombine():合并任务,有返回值 。
thenAccepetBoth():两个任务执行完成后,将结果交给thenAccepetBoth处理,无返回值 。
runAfterBoth():两个任务都执行完成后,执行下一步操作(Runnable类型任务。
结果处理
whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作。
exceptionally:返回一个新的CompletableFuture,当前面的CompletableFuture完成时,它也完成,当它异常完成时,给定函数的异常触发这个CompletableFuture的完成。
2.2 supplyAsync
CompletableFuture如果不提供线程池的话,默认使用的ForkJoinPool,而ForkJoinPool内部是守护线程,如果main线程结束了,守护线程会跟着一起结束。
java 体验AI代码助手 代码解读复制代码 public static void main(String[] args) {
CompletableFuture firstTask = CompletableFuture.supplyAsync(() -> {
System.out.println(“task begin!”);
System.out.println(“task end!”);
return “result”;
});
String result1 = firstTask.join();
String result2 = null;
try {result2 = firstTask.get();
} catch (InterruptedException e) {e.printStackTrace();
} catch (ExecutionException e) {e.printStackTrace();
}System.out.println(result1 + "," + result2);
}
2.3 runAsync
当前方式既不会接收参数,也不会返回任何结果,非常基础的任务编排方式
java 体验AI代码助手 代码解读复制代码 public static void main(String[] args) throws IOException {
CompletableFuture.runAsync(() -> {
System.out.println(“task begin!”);
System.out.println(“task end!”);
});
System.in.read();
}
2.4 thenApply,thenApplyAsync
有任务A,还有任务B。任务B需要在任务A执行完毕后再执行。而且任务B需要任务A的返回结果。
任务B自身也有返回结果。thenApply可以拼接异步任务,前置任务处理完之后,将返回结果交给后置任务,然后后置任务再执行thenApply提供了带有Async的方法,可以指定每个任务使用的具体线程池。
thenApply:
java 体验AI代码助手 代码解读复制代码 public static void main(String[] args) {
CompletableFuture taskA = CompletableFuture.supplyAsync(() -> {
String id = UUID.randomUUID().toString();
System.out.println(“taskA:” + id);
return id;
});
CompletableFuture taskB = taskA.thenApply(result -> {
System.out.println(“taskA resule:” + result);
result = result.replace(“-”, “”);
return result;
});
System.out.println("main task deal result:" + taskB.join());
}
thenApplyAsync:
java 体验AI代码助手 代码解读复制代码 public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture taskB = CompletableFuture.supplyAsync(() -> {
String id = UUID.randomUUID().toString();
System.out.println(“taskA:” + id + “,” + Thread.currentThread().getName());
return id;
}).thenApplyAsync(result -> {
System.out.println(“taskB get taskA result:” + result + “,” + Thread.currentThread().getName());
result = result.replace(“-”, “”);
return result;
},executor);
System.out.println("main thread:" + taskB.join());
}
2.5 thenAccept,thenAcceptAsync(自定义线程池)
套路和thenApply一样,都是任务A和任务B的拼接前置任务需要有返回结果,后置任务会接收前置任务的结果,返回后置任务,没有返回值
java 体验AI代码助手 代码解读复制代码 public static void main(String[] args) throws IOException {
CompletableFuture.supplyAsync(() -> {
System.out.println(“taskA”);
return “abcdefg”;
}).thenAccept(result -> {
System.out.println(“taskB,get result:” + result);
});
System.in.read();
}
2.6 thenRun,thenRunAsync(自定义线程池)
套路和thenApply,thenAccept一样,都是任务A和任务B的拼接,前置任务没有返回结果,后置任务不接收前置任务结果,后置任务也没有返回结果。
java 体验AI代码助手 代码解读复制代码 public static void main(String[] args) throws IOException {
CompletableFuture.runAsync(() -> {
System.out.println(“taskA begin!”);
}).thenRun(() -> {
System.out.println(“taskB begin!”);
});
System.in.read();
}
2.7 thenCombine,thenAcceptBoth,runAfterBoth
比如有任务A,任务B,任务C。任务A和任务B并行执行,等到任务A和任务B全部执行完毕后,再执行任务C。
2.7.1 thenCombine
当前方式当前方式前置任务需要有返回结果,后置任务接收前置任务的结果,有返回值
java 体验AI代码助手 代码解读复制代码 public static void main(String[] args) {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println(“taskA begin!!”);
return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(“taskB begin!!”);
return 10;
}), (r1, r2) -> {
System.out.println(“taskC begin!!”);
return r1 + r2;
});
System.out.println("taskC result = " + future.join());
}
2.7.2 thenAcceptBoth
当前方式前置任务需要有返回结果,后置任务接收前置任务的结果,没有返回值
java 体验AI代码助手 代码解读复制代码 public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
System.out.println(“taskA begin!!”);
return 10;
}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
System.out.println(“taskB begin!!”);
return 10;
}), (r1, r2) -> {
System.out.println(“taskC begin!!”);
int r = r2 + r1;
System.out.println("taskC result = " + r);
});
}
2.7.3 runAfterBoth
当前方式前置任务不需要有返回结果,后置任务不会接收前置任务的结果,没有返回值
java 体验AI代码助手 代码解读复制代码 public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
System.out.println(“taskA begin!!”);
return 10;
}).runAfterBoth(CompletableFuture.supplyAsync(() -> {
System.out.println(“taskB begin!!”);
return 10;
}), () -> {
System.out.println(“taskC begin!!”);
});
}
2.8 applyToEither,acceptEither,runAfterEither
这三个方法:比如有任务A,任务B,任务C。任务A和任务B并行执行,只要任务A或者任务B执行完毕,开始执行任务C.
applyToEither:可以接收结果并且返回结果,acceptEither:可以接收结果没有返回结果,runAfterEither:不接收结果也没返回结果,三个方法拼接任务的方式都是一样的,applyToEither:只演示一个其它套路一样。
java 体验AI代码助手 代码解读复制代码 public static void main(String[] args) throws IOException {
CompletableFuture taskC = CompletableFuture.supplyAsync(() -> {
System.out.println(“taskA begin!!”);
return 78;
}).applyToEither(CompletableFuture.supplyAsync(() -> {
System.out.println(“taskB begin!!”);
return 66;
}), resultFirst -> {
System.out.println(“taskC begin!!”);
return resultFirst;
});
System.out.println(taskC.join());
System.in.read();
}
2.9 exceptionally,thenCompose,handle
exceptionally:
这个也是拼接任务的方式,但是只有前面业务执行时出现异常了,才会执行当前方法来处理.
只有异常出现时,CompletableFuture的编排任务没有处理完时,才会触发。
拿不到任务结果。
whenComplete,handle:
这两个也是异常处理的套路,可以根据方法描述发现,他的功能方向比exceptionally要更加丰富
whenComplete:
可以拿到返回结果同时也可以拿到出现的异常信息,但是whenComplete本身是Consumer不能返回结果。无法帮你捕获异常,但是可以拿到异常返回的结果。
handle:
可以拿到返回结果同时也可以拿到出现的异常信息,并且也可以指定返回托底数据。可以捕获异常的,异常不会抛出去。
java 体验AI代码助手 代码解读复制代码 public static void main(String[] args) throws IOException {
CompletableFuture taskC = CompletableFuture.supplyAsync(() -> {
System.out.println(“taskA begin!!”);
return 78;
}).applyToEither(CompletableFuture.supplyAsync(() -> {
System.out.println(“taskB begin!!”);
return 66;
}), resultFirst -> {
System.out.println(“taskC begin!!”);
return resultFirst;
}).handle((r,ex) -> {
System.out.println(“handle:” + r);
System.out.println(“handle:” + ex);
return -1;
});
/.exceptionally(ex -> {
System.out.println(“exceptionally:” + ex);
return -1;
});/
/.whenComplete((r,ex) -> {
System.out.println(“whenComplete:” + r);
System.out.println(“whenComplete:” + ex);
});/
System.out.println(taskC.join());
}
2.10 allOf,anyOf
2.10.1 allOf
allOf的方式是让内部编写多个CompletableFuture的任务,多个任务都执行完后,才会继续执行你后续拼接的任务。
allOf返回的CompletableFuture是Void,没有返回结果
java 体验AI代码助手 代码解读复制代码 public static void main(String[] args) throws IOException {
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“taskA begin!!”);
}),
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“taskB begin!!”);
}),
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“taskC begin!!”);
})
).thenRun(() -> {
System.out.println(“taskD begin!!”);
});
}
2.10.2 anyOf:
anyOf是基于多个CompletableFuture的任务,只要有一个任务执行完毕就继续执行后续,最先执行完的任务做作为返回结果的入参
java 体验AI代码助手 代码解读复制代码 public static void main(String[] args) throws IOException {
CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“taskA begin!!”);
return “A”;
}),
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“taskB begin!!”);
return “B”;
}),
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“taskC begin!!”);
return “C”;
})
).thenAccept(r -> {
System.out.println(“taskD begin,” + r + “first end”);
});
}