Callable<String>callable=newCallable<String>(){@OverridepublicStringcall()throwsException{// Perform some computation
Thread.sleep(2000);return"Return some result";}};
Callable<String>callable=()->{// Perform some computation
Thread.sleep(2000);return"Return some result";};
Callable的定义如下:
1
2
3
4
5
6
7
8
9
10
@FunctionalInterfacepublicinterfaceCallable<V>{/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/Vcall()throwsException;}
可以发现它是可以带返回值的,并且能够抛出异常。
Runnable接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@FunctionalInterfacepublicinterfaceRunnable{/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/publicabstractvoidrun();}
importjava.util.concurrent.*;publicclassFutureAndCallableExample{publicstaticvoidmain(String[]args)throwsInterruptedException,ExecutionException{ExecutorServiceexecutorService=Executors.newSingleThreadExecutor();Callable<String>callable=()->{// Perform some computation
System.out.println("Entered Callable");Thread.sleep(2000);return"Hello from Callable";};System.out.println("Submitting Callable");Future<String>future=executorService.submit(callable);// This line executes immediately
System.out.println("Do something else while callable is getting executed");System.out.println("Retrieve the result of the future");// Future.get() 会阻塞知道Future中得到了返回的结果
Stringresult=future.get();System.out.println(result);executorService.shutdown();}}
mportjava.util.concurrent.*;publicclassFutureIsDoneExample{publicstaticvoidmain(String[]args)throwsInterruptedException,ExecutionException{ExecutorServiceexecutorService=Executors.newSingleThreadExecutor();Future<String>future=executorService.submit(()->{Thread.sleep(2000);return"Hello from Callable";});while(!future.isDone()){System.out.println("Task is still not done...");Thread.sleep(200);}System.out.println("Task completed! Retrieving the result");Stringresult=future.get();System.out.println(result);executorService.shutdown();}}
输出结果如下:
# Output
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task completed! Retrieving the result
Hello from Callable
importjava.util.concurrent.*;publicclassFutureCancelExample{publicstaticvoidmain(String[]args)throwsInterruptedException,ExecutionException{ExecutorServiceexecutorService=Executors.newSingleThreadExecutor();longstartTime=System.nanoTime();Future<String>future=executorService.submit(()->{Thread.sleep(2000);return"Hello from Callable";});while(!future.isDone()){System.out.println("Task is still not done...");Thread.sleep(200);doubleelapsedTimeInSec=(System.nanoTime()-startTime)/1000000000.0;if(elapsedTimeInSec>1){future.cancel(true);}}System.out.println("Task completed! Retrieving the result");Stringresult=future.get();System.out.println(result);executorService.shutdown();}}
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task completed! Retrieving the result
Exception in thread "main" java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.sankuai.stafftraining.wujiahong.demo.springdemo.concurrency.MainApp.test3(MainApp.java:79)
at com.sankuai.stafftraining.wujiahong.demo.springdemo.concurrency.MainApp.main(MainApp.java:12)
最好是通过下面这种方法进行判断:
1
2
3
4
5
6
7
if(!future.isCancelled()){System.out.println("Task completed! Retrieving the result");Stringresult=future.get();System.out.println(result);}else{System.out.println("Task was cancelled");}
importjava.util.Arrays;importjava.util.List;importjava.util.concurrent.*;publicclassInvokeAllExample{publicstaticvoidmain(String[]args)throwsInterruptedException,ExecutionException{ExecutorServiceexecutorService=Executors.newFixedThreadPool(5);Callable<String>task1=()->{Thread.sleep(2000);return"Result of Task1";};Callable<String>task2=()->{Thread.sleep(1000);return"Result of Task2";};Callable<String>task3=()->{Thread.sleep(5000);return"Result of Task3";};List<Callable<String>>taskList=Arrays.asList(task1,task2,task3);List<Future<String>>futures=executorService.invokeAll(taskList);for(Future<String>future:futures){// The result is printed only after all the futures are complete. (i.e. after 5 seconds)
System.out.println(future.get());}executorService.shutdown();}}
importjava.util.Arrays;importjava.util.List;importjava.util.concurrent.*;publicclassInvokeAnyExample{publicstaticvoidmain(String[]args)throwsInterruptedException,ExecutionException{ExecutorServiceexecutorService=Executors.newFixedThreadPool(5);Callable<String>task1=()->{Thread.sleep(2000);return"Result of Task1";};Callable<String>task2=()->{Thread.sleep(1000);return"Result of Task2";};Callable<String>task3=()->{Thread.sleep(5000);return"Result of Task3";};// Returns the result of the fastest callable. (task2 in this case)
Stringresult=executorService.invokeAny(Arrays.asList(task1,task2,task3));System.out.println(result);executorService.shutdown();}}
// Run a task specified by a Runnable Object asynchronously.
CompletableFuture<Void>future=CompletableFuture.runAsync(newRunnable(){@Overridepublicvoidrun(){// Simulate a long-running Job
try{TimeUnit.SECONDS.sleep(1);}catch(InterruptedExceptione){thrownewIllegalStateException(e);}System.out.println("I'll run in a separate thread than the main thread.");}});// Block and wait for the future to complete
future.get()
或者:
1
2
3
4
5
6
7
8
9
10
// Using Lambda Expression
CompletableFuture<Void>future=CompletableFuture.runAsync(()->{// Simulate a long-running Job
try{TimeUnit.SECONDS.sleep(1);}catch(InterruptedExceptione){thrownewIllegalStateException(e);}System.out.println("I'll run in a separate thread than the main thread.");});
// Run a task specified by a Supplier object asynchronously
CompletableFuture<String>future=CompletableFuture.supplyAsync(newSupplier<String>(){@OverridepublicStringget(){try{TimeUnit.SECONDS.sleep(1);}catch(InterruptedExceptione){thrownewIllegalStateException(e);}return"Result of the asynchronous computation";}});// Block and get the result of the Future
Stringresult=future.get();System.out.println(result);
或者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Run a task specified by a Supplier object asynchronously
CompletableFuture<String>future=CompletableFuture.supplyAsync(newSupplier<String>(){@OverridepublicStringget(){try{TimeUnit.SECONDS.sleep(1);}catch(InterruptedExceptione){thrownewIllegalStateException(e);}return"Result of the asynchronous computation";}});// Block and get the result of the Future
Stringresult=future.get();System.out.println(result);
// Create a CompletableFuture
CompletableFuture<String>whatsYourNameFuture=CompletableFuture.supplyAsync(()->{try{TimeUnit.SECONDS.sleep(1);}catch(InterruptedExceptione){thrownewIllegalStateException(e);}return"Rajeev";});// Attach a callback to the Future using thenApply()
CompletableFuture<String>greetingFuture=whatsYourNameFuture.thenApply(name->{return"Hello "+name;});// Block and get the result of the future.
System.out.println(greetingFuture.get());// Hello Rajeev
// thenAccept() example
CompletableFuture.supplyAsync(()->{returnProductService.getProductDetail(productId);}).thenAccept(product->{System.out.println("Got product detail from remote service "+product.getName())});
CompletableFuture<String>downloadWebPage(StringpageLink){returnCompletableFuture.supplyAsync(()->{// Code to download and return the web page's content
});}
下载一个网站的100个不同的页面,使用allof方法:
1
2
3
4
5
6
7
8
9
10
11
12
List<String>webPageLinks=Arrays.asList(...)// A list of 100 web page links
// Download contents of all the web pages asynchronously
List<CompletableFuture<String>>pageContentFutures=webPageLinks.stream().map(webPageLink->downloadWebPage(webPageLink)).collect(Collectors.toList());// Create a combined Future using allOf()
CompletableFuture<Void>allFutures=CompletableFuture.allOf(pageContentFutures.toArray(newCompletableFuture[pageContentFutures.size()]));
// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>>allPageContentsFuture=allFutures.thenApply(v->{returnpageContentFutures.stream().map(pageContentFuture->pageContentFuture.join()).collect(Collectors.toList());});
// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture<Long>countFuture=allPageContentsFuture.thenApply(pageContents->{returnpageContents.stream().filter(pageContent->pageContent.contains("CompletableFuture")).count();});System.out.println("Number of Web Pages having CompletableFuture keyword - "+countFuture.get());
CompletableFuture<String>future1=CompletableFuture.supplyAsync(()->{try{TimeUnit.SECONDS.sleep(2);}catch(InterruptedExceptione){thrownewIllegalStateException(e);}return"Result of Future 1";});CompletableFuture<String>future2=CompletableFuture.supplyAsync(()->{try{TimeUnit.SECONDS.sleep(1);}catch(InterruptedExceptione){thrownewIllegalStateException(e);}return"Result of Future 2";});CompletableFuture<String>future3=CompletableFuture.supplyAsync(()->{try{TimeUnit.SECONDS.sleep(3);}catch(InterruptedExceptione){thrownewIllegalStateException(e);}return"Result of Future 3";});CompletableFuture<Object>anyOfFuture=CompletableFuture.anyOf(future1,future2,future3);System.out.println(anyOfFuture.get());// Result of Future 2
Integerage=-1;CompletableFuture<String>maturityFuture=CompletableFuture.supplyAsync(()->{if(age<0){thrownewIllegalArgumentException("Age can not be negative");}if(age>18){return"Adult";}else{return"Child";}}).exceptionally(ex->{// 在此处打印相关的日志,返回值需要特别注意,可以返回一个指定的值,然后在后面进行过滤
System.out.println("Oops! We have an exception - "+ex.getMessage());return"Unknown!";});System.out.println("Maturity : "+maturityFuture.get());
Integerage=-1;CompletableFuture<String>maturityFuture=CompletableFuture.supplyAsync(()->{if(age<0){thrownewIllegalArgumentException("Age can not be negative");}if(age>18){return"Adult";}else{return"Child";}}).handle((res,ex)->{if(ex!=null){System.out.println("Oops! We have an exception - "+ex.getMessage());return"Unknown!";}returnres;});System.out.println("Maturity : "+maturityFuture.get());
-------------
Main Thread Id: Thread[main,5,main]
supplyAsyncWithSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-9,5,main]------thenApply Thread Id : Thread[ForkJoinPool.commonPool-worker-9,5,main]
supplyAsyncWithSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-9,5,main]------thenApplyAsync Thread Id : Thread[ForkJoinPool.commonPool-worker-9,5,main]
-------------No Sleep
Main Thread Id: Thread[main,5,main]
supplyAsyncNoSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]------thenApply Thread Id : Thread[main,5,main]
supplyAsyncNoSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]------thenApplyAsync Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]