0

Future/CompletableFuture与Lambda

 2 years ago
source link: https://segmentfault.com/a/1190000040854069
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Future/CompletableFuture与Lambda

发布于 今天 14:42

Future和CompletableFuture

Runnable task = () -> System.out.println("Hello World");
ExecutorService service = Executors.newSingleThreadExecutor();
//直接通过ExecutorService执行
Future<?> future = service.submit(task);
//或者通过CompletableFuture调用ExecutorService执行
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(task, service);
//还可以写为直接调用task,不通过ExecutorService
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(task);
//执行完毕后需要关闭,否则线程仍然卡着
service.shutdown();
//输出
Hello World
//Callable方式
Supplier<String> task = () -> {
    System.out.println("Hello World!");
    return "Hello World!";
};
ExecutorService service = Executors.newSingleThreadExecutor();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(task, service);
//或者写为不通过ExecutorService的方式
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(task);
//获取执行结果
String string = completableFuture.join();
System.out.println(string);
service.shutdown();
//输出
Hello World
Hello World

Future有五个方法

//阻塞等待获取结果
T get()
//设定超时时间获取结果,否则返回excepiton
T get(Long timeout,TimeUnit unit)
//取消该线程
void cancel()
//获取是否已经完成
boolean isDone()
//获取是否被取消了
boolean isCancelled()

CompletableFuture也有五个方法

//同Future的get()方法
T join()
//检测线程的执行状态,如果完成返回结果,未完成则返回默认的valueIfAbsent的值
T getNow(T valueIfAbsent)
//强制返回值方法,如果线程完成了,则返回get或者join的结果,如果未完成,则强制中断线程,并将设置的value返回
boolean complete(V value)
//强制返回值方法,如果线程完成了,则强制设置value为返回值,如果未完成,则强制完成线程,并将设置的value返回
void obtrudeValue(V value)
//强制返回Exception,如果线程未完成,则强制完成
boolean completeExceptionally(Throwable t)
//强制返回Exception,即使线程完成了,也强制完成,强制返回异常状态
void obtrudeException(V value)

complete方法,如果未完成强制完成

Supplier<String> task = () -> {
    try {
        Thread.sleep(500);
    }catch(Exception e){

    }
    return "Hello World!"+Thread.currentThread().getName();
};
ExecutorService service = Executors.newSingleThreadExecutor();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(task, service);
completableFuture.complete("Too long");
String string = completableFuture.join();
System.out.println(string);
service.shutdown();
//输出
Too long

//但是如果另外一种写法
Supplier<String> task = () -> {
    try {
        Thread.sleep(500);
    }catch(Exception e){

    }
    return "Hello World!"+Thread.currentThread().getName();
};
ExecutorService service = Executors.newSingleThreadExecutor();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(task, service);
try {
    Thread.sleep(800);
}catch(Exception e){

}
completableFuture.complete("Too long");
String string = completableFuture.join();
System.out.println(string);
service.shutdown();
//输出
Hello World!pool-1-thread-1

obtrudeValue方法,无论线程是否完成都设置value为返回值

Supplier<String> task = () -> {
    try {
        Thread.sleep(500);
    }catch(Exception e){

    }
    return "Hello World!"+Thread.currentThread().getName();
};
ExecutorService service = Executors.newSingleThreadExecutor();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(task, service);
try {
    Thread.sleep(800);
}catch(Exception e){

}
completableFuture.obtrudeValue("Too long");
String string = completableFuture.join();
System.out.println(string);
service.shutdown();
//输出
Too long

first task: supplier supplyAsync(),runable runAsync()
seconde task:runable thenRun(),consumer thenAccept(),function thenApply()

//Runable   ()->System.out.println("Person read")    void run()    thenRun()
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()-> System.out.println("run first"))
.thenRun(()-> System.out.println("then run"));
//Consumer  n->System.out.println(n+"Person read")    void accept()    thenAccept()
//Function  id -> readPersonFromDB(id)                Person apply()    thenApply()

//具体执行方法
CompletableFuture<List<Person>>  completableFuture = CompletableFuture.supplyAsync(()-> Arrays.asList(1,2,3))
.thenApply(list -> readPerson(list));
completableFuture.thenRun(()-> System.out.println("The list of Person had been read"));
completableFuture.thenAccept(persons -> System.out.println(persons.size()));
public static List<Person> readPerson(List<Integer> list){
    List<Person> personList = new ArrayList();
    for(int i :list ){
        personList.add(new Person("name",i));
    }
    return personList;
}
//输出
The list of Person had been read
3

在哪个线程执行

CompletableFuture<List<Person>> c2 = CompletableFuture.runAsync()//common FJ Pool
c2.thenApplyAsync()//provided pool of threads
c2.thenRun()//the same thread
c2.thenAcceptAsync()//the same pool of thread

CompletableFuture

public class TestCompletableFuture {
    public static void main(String[] args) throws Exception{
        Supplier<List<Long>> supplyIDs = () -> {
            sleep(200);
            return Arrays.asList(1L,2L,3L);
        };
        Function<List<Long>, List<User>> fetchUsers = ids -> {
            sleep(300);
            return ids.stream().map(User::new).collect(Collectors.toList());
        };
        Consumer<List<User>> displayer = users -> users.forEach(System.out::println);
        CompletableFuture<List<Long>> completableFuture = CompletableFuture.supplyAsync(supplyIDs);
        completableFuture.thenApply(fetchUsers).thenAccept(displayer);
        sleep(10000);
    }
    private static void sleep(int time){
        try{
            Thread.sleep(time);
        }catch (Exception e){
        }
    }
}
//输出
User{id=1}
User{id=2}
User{id=3}
//还可增加ExecutorService保持在单独的线程内执行
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture<List<Long>> completableFuture = CompletableFuture.supplyAsync(supplyIDs,executorService);
completableFuture.thenApply(fetchUsers).thenAcceptAsync(displayer,executorService);


Function<List<Long>, CompletableFuture<List<User>>>内置CompletableFuture方法
public class TestCompletableFuture {
    public static void main(String[] args) throws Exception{
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        ExecutorService executorService2 = Executors.newSingleThreadExecutor();
        Supplier<List<Long>> supplyIDs = () -> {
            sleep(200);
            System.out.println("supplyIDs=="+Thread.currentThread().getName());
            return Arrays.asList(1L,2L,3L);
        };
        Function<List<Long>, CompletableFuture<List<User>>> fetchUsers = ids -> {
            System.out.println("fetchUsers=="+Thread.currentThread().getName());
            sleep(300);
            Supplier<List<User>> userSuplier = () -> {
                System.out.println("userSuplier=="+Thread.currentThread().getName());
                return ids.stream().map(User::new).collect(Collectors.toList());
            };
            return CompletableFuture.supplyAsync(userSuplier,executorService2);
        };
        Consumer<List<User>> displayer = users -> users.forEach(System.out::println);
        CompletableFuture<List<Long>> completableFuture = CompletableFuture.supplyAsync(supplyIDs,executorService);
        completableFuture.thenComposeAsync(fetchUsers,executorService2).thenAcceptAsync(displayer,executorService);
        sleep(10000);
        executorService.shutdown();
    }
    private static void sleep(int time){
        try{
            Thread.sleep(time);
        }catch (Exception e){
        }
    }
}

thenAcceptBoth可以同时完成两个不同的CompletableFuture

ExecutorService executorService = Executors.newSingleThreadExecutor();
ExecutorService executorService2 = Executors.newSingleThreadExecutor();
Supplier<List<Long>> supplyIDs = () -> {
    sleep(200);
    System.out.println("supplyIDs=="+Thread.currentThread().getName());
    return Arrays.asList(1L,2L,3L);
};
Function<List<Long>, CompletableFuture<List<User>>> fetchUsers = ids -> {
    System.out.println("fetchUsers=="+Thread.currentThread().getName());
    sleep(300);
    Supplier<List<User>> userSuplier = () -> {
        System.out.println("userSuplier=="+Thread.currentThread().getName());
        return ids.stream().map(User::new).collect(Collectors.toList());
    };
    return CompletableFuture.supplyAsync(userSuplier,executorService2);
};
Function<List<Long>, CompletableFuture<List<Email>>> fetchEmails = ids -> {
    System.out.println("fetchUsers=="+Thread.currentThread().getName());
    sleep(300);
    Supplier<List<Email>> emailSuplier = () -> {
        System.out.println("userSuplier=="+Thread.currentThread().getName());
        return ids.stream().map(Email::new).collect(Collectors.toList());
    };
    return CompletableFuture.supplyAsync(emailSuplier,executorService2);
};
Consumer<List<User>> displayer = users -> users.forEach(System.out::println);
CompletableFuture<List<Long>> completableFuture = CompletableFuture.supplyAsync(supplyIDs,executorService);
CompletableFuture<List<User>> userComp  = completableFuture.thenComposeAsync(fetchUsers,executorService2);
CompletableFuture<List<Email>> emailComp  = completableFuture.thenComposeAsync(fetchEmails,executorService2);
userComp.thenAcceptBoth(emailComp,
        (user,email) ->{
            System.out.println(user.size()+""+email.size());
        });
sleep(1000);
executorService.shutdown();
executorService2.shutdown();

acceptEither执行完成任何一个后再执行其他操作

public static void main(String[] args) throws Exception{
    Supplier<List<Long>> supplyIDs = () -> {
        sleep(200);
        return Arrays.asList(1L,2L,3L);
    };
    Function<List<Long>, CompletableFuture<List<User>>> fetchUsers1 = ids -> {
        sleep(300);
        Supplier<List<User>> userSuplier = () -> {
            return ids.stream().map(User::new).collect(Collectors.toList());
        };
        return CompletableFuture.supplyAsync(userSuplier);
    };
    Function<List<Long>, CompletableFuture<List<User>>> fetchUsers2 = ids -> {
        sleep(3000);
        Supplier<List<User>> userSuplier = () -> {
            return ids.stream().map(User::new).collect(Collectors.toList());
        };
        return CompletableFuture.supplyAsync(userSuplier);
    };
    Consumer<List<User>> displayer = users -> users.forEach(System.out::println);
    CompletableFuture<List<Long>> completableFuture = CompletableFuture.supplyAsync(supplyIDs);
    CompletableFuture<List<User>> user1  = completableFuture.thenComposeAsync(fetchUsers1);
    CompletableFuture<List<User>> user2  = completableFuture.thenComposeAsync(fetchUsers2);
    user1.thenRun(()-> System.out.println("User1 done"));
    user2.thenRun(()-> System.out.println("User2 done"));
    user1.acceptEither(user2, displayer);
    sleep(100000);
}

CompletableFuture异常处理

exceptionally()
whenComplete()
handle()

CompletableFuture<List<User>> user3 = CompletableFuture.supplyAsync(supplyIDs)
        .thenApply(list -> readUser(list))
        .exceptionally(excetion -> new ArrayList<User>());
user3.thenRun(()-> System.out.println("user3 is run"));
//whenComplete
CompletableFuture<List<User>> user3 = CompletableFuture.supplyAsync(supplyIDs)
        .thenApply(list -> readUser(list))
        .whenComplete(
                (list, exception) -> {
                    if (list !=null)
                        System.out.println("run successfuly");
                    else
                        System.out.println("run exception");
                }
        );
//handle可以吞下异常,返回新的异常或者返回默认结果
CompletableFuture<List<User>> user3 = CompletableFuture.supplyAsync(supplyIDs)
        .thenApply(list -> readUser(list))
        .handle(
                (list, exception) -> {
                    if (list !=null)
                        return list;
                    else
                        return new ArrayList<User>();
                }
        );

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK