본문 바로가기
프로그래밍 놀이터/안드로이드, Java

[Java8 In Action] #11 CompletableFuture: 조합할 수 있는 비동기 프로그래밍

by 돼지왕 왕돼지 2018. 12. 31.
반응형

[Java8 In Action] #11 CompletableFuture: 조합할 수 있는 비동기 프로그래밍


Java8 In Action 내용을 보며 정리한 내용입니다.

정리자는 기존에 Java8 을 한차례 rough 하게 공부한 적이 있고, Kotlin 역시 공부한 적이 있습니다.

위의 prerequisite 가 있는 상태에서 추가적인 내용만 정리한 내용이므로, 제대로 공부를 하고 싶다면 책을 구매해서 보길 권장합니다!


allof, anyof, callable, completablefuture, CompletableFuture<Void>, completeExceptionally, custom executor, executor, executorservice, ForkJoinPool, Future, java8 in action, Join, supplyAsync, thenAcceptAsync, thenApply, thenCombine, thenCompose


-

최근 소프트웨어 구현 방법에 큰 변화를 불러온 두 가지 추세가 있다.

하나는 앱을 실행하는 하드웨어와 관련된 변화고 다른 하나는 앱 구조 특히 앱끼리 어떻게 상호작용하는가와 관련된 변화다.




11.1. Future


-

자바5부터 제공된다. 비동기 계산을 모델링하는 데 사용한다.

Future 를 이용하려면 시간이 오래 걸리는 작업을 Callable 객체 내부로 감싼 다음에 ExecutorService 에 제출해야 한다.




11.1.1. Future 제한


-

Future 의 결과가 있을 때 이들의 의존성을 표현하기가 어렵다.




11.1.2. CompletableFuture 로 비동기 앱 만들기





11.2. 비동기 API 구현


11.2.1. 동기 메서드를 비동기 메서드로 변환


-

public Future<Double> getPriceAsync(String product){
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread( () -> {
        try{
            double price = calculatePrice(product);
            futurePrice.complete(price);
        } catch(Exception e){
            futurePrice.completeExceptionally(e); 
        }
    }).start();
    return futurePrice;
}




11.2.2. 에러 처리 방법


* 팩토리 메서드 supplyAsync 로 CompletableFuture 만들기


-

public Future<Double> getPriceAsync(String product){
    return CompletableFuture.supplyAsync( () -> calculatePrice(product) );
}

ForkJoinPool 의 Executor 중 하나가 supplier 를 실행한다.

그리고 에러가 발생하면 completeExceptionally 도 자동으로 불러준다.





11.3. 비블록 코드 만들기


11.3.1. 병렬 스트림으로 요청 병렬화하기



11.3.2. CompletableFuture 로 비동기 호출 구현하기.


-

public List<String> findPrices(String product){
    List<CompletableFuture<String>> priceFutures = 
        shops.stream()
            .map(shop -> CompltableFuture.supplyAsync( () ->
                shop.getName() + “ price if “ + shop.getPrice(product)))
            .collect(Collectors.toList());

    return priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(toList());
}



11.3.3. 더 확장성이 좋은 해결 방법


-

ComletableFuture 는 병렬 스트림 버전에 비해 작업에 이용할 수 있는 다양한 Executor 를 지정할 수 있다는 장점이 있다.




11.3.4. 커스텀 Executor 사용하기


-

스레드 풀이 너무 크면 CPU 와 메모리 자원을 서로 경쟁하느라 시간을 낭비할 수 있다.

반면 스레드 풀이 너무 작으면 CPU 의 일부 코어는 활용되지 않을 수 있다.

다음과 같은 공식으로 Thread 갯수를 정하면 좋다.


threadNum = cpuNum * cpuUsage * ( 1 + W/C )


cpuNum 은 Runtime.getRuntime().availableProcessors() 가 반환하는 코어 수

cpuUsage 는 0 과 1 사이의 값을 갖는 CPU 활용 비율

W/C 는 대기시간과 계산시간의 비율



-

Executors.newFixedThreadPool( Math.min(shops.size(), 100), new ThreadFactory(){
        public Thread newThread(Runnable r){
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    }
});

자바에서 일반 스레드가 실행 중이면 자바 프로그램은 종료되지 않는다.

따라서 어떤 이벤트를 한없이 기다리면서 종료되지 않는 일반 스레드가 있으면 문제가 될 수 있다.

반면 데몬 스레드는 자바 프로그램이 종료될 때 강제로 실행이 종료될 수 있다.

두 스레드의 성능은 같다.



-

비동기 동작을 많이 사용하는 상황에서는 Custom Executor 를 설정하는 기법이 가장 효과적일 수 있다.



-

I/O 가 포함되지 않은 계산 중심의 동작을 실행할 때는 스트림 인터페이스가 가장 구현하기 간단하며 효율적일 수 있다. ( 모든 스레드가 계산 작업을 수행하는 상황에서는 프로세서 코어 수 이상의 스레드를 가질 필요가 없다. )


반면 작업이 I/O 를 기다리는 작업을 병렬로 실행할 때는 CompletableFuture 가 더 많은 유연성을 제공하며 대기/계산(W/C) 의 비율에 적합한 스레드 수를 설정할 수 있다. 특히 스트림의 게으른 특성 때문에 스트림에서 I/O 를 실제로 언제 처리할지 예측하기 어려운 문제도 있다.





11.4. 비동기 작업 파이프라인 만들기


11.4.1. 할인 서비스 구현



11.4.2. 할인 서비스 사용



11.4.3. 동기 작업과 비동기 작업 조합하기


-

public List<String> findPrices(String product){
    List<CompletableFuture<String>> priceFutures = shops.stream()
        .map(shop -> CompletableFuture.supplyAsync( () -> shop.getPrice(product), executor ))
        .map(future -> future.thenApply(Quote::parse))
        .map(future -> future.thenCompose(quote -> CompetableFuture.supplyAsync( () -> Discount.applyDiscount(quote), executor) ))
        .collect(toList());

    return priceFutures.stream()
        .map(CompletableFuture::join)
        .collect(toList());
}


-

CompletableFuture.supplyAsync 에 lambda 를 전달해 비동기적으로 가격정보를 가져온다.

첫번째 mapping 의 supplyAsync 의 return 값은 CompletableFuture<String> 이다.

두번째 mapping 의 thenApply 의 return 값은 CompletableFuture<Quote> 이다.


thenApply 메서드는 CompletableFuture 가 끝날 때까지 실행되지 않는다.

즉, CompletableFuture 가 동작을 완전히 완료한 다음에 thenApply 메서드로 전달된 람다 표현식을 적용할 수 있다.


세번째 mapping 의 thenCompose 는 CompletableFuture<String> 이 반환된다.


자바8의 CompletableFuture API 는 위와 같은 두 비동기 연산을 파이프라인으로 만들 수 있도록 thenCompose 메서드를 제공한다.

thenCompose 메서드는 첫 번째 연산의 결과를 두번째 연산으로 전달한다.


마지막으로 CompletableFuture 들에 대해 join 을 통해 기다렸다가 최종 결과를 수집할 수 있다.




11.4.4. 독립 CompletableFuture 와 비독립 CompletableFuture 합치기


-

thenCompose 는 앞선 Future 의 실행 결과를 입력으로 받는 다음의 Future 로 전달하는 연할을 한다.

그러나 독립적으로 실행된 두 개의 CompletableFuture 결과를 합쳐야 하는 상황에서는 thenCombine 메서드를 사용한다.

thenCombine 은 BiFunction 을 두번째 인수로 받는다.

BiFunction 은 CompletableFuture 결과를 어떻게 합칠지 정의한다.


thenCompose 와 마찬가지로 thenCombine 메서드에도 Async 버전이 존재한다.



-

Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync( () -> shop.getPrice(product) )
        .thenCombine( CompletableFuture.supplyAsync( () -> exchangeService.getRate(Money.EUR, Money.USD) ), (price, rate) -> price * rate ) );




11.4.5. Future 의 리플렉션과 ComletableFuture 의 리플렉션





11.5. CompletableFuture 종료에 대응하는 방법


11.5.1. 최저가격 검색 앱 리팩토링


-

자바 8 의 CompletableFuture API 는 thenAccept 라는 메서드로 CompletableFuture 에 동작이 끝나면 값을 소비하게 할 수 있다.

Stream<CompletableFuture<String> stream = getFindPricesStream(“myPhone”);
stream.map( f -> f.thenAccept(System.out::println));
// thenAcceptAsync 도 있다.

thenAccept 는 CompletableFuture<Void> 를 return 한다.



-

모든 future 의 종료를 detect 하려면 아래와 같이 하면 된다.

CompletableFuture[] futures = findPricesStream(“myPhone”)
        .map( f -> f.thenAccept(System.out::println) )
        .toArray( size -> new CompletableFuture[size] );
CompletableFuture.allOf(futures).join();

allOf 팩토리 메서드는 CompletableFuture 배열을 입력으로 받아 CompletableFuture<Void> 를 반환한다.

전달된 모든 CompletableFuture 가 완료되어야 CompletableFuture<Void> 가 완료된다.



-

모든 종료가 아닌 하나의 종료를 detect 하려면 anyOf 를 사용하면 된다.

CompletableFuture 배열을 입력으로 받아서 CompletableFuture<Object> 를 반환한다.

CompletableFuture<Object>는 처음으로 완료한 CompletableFuture 의 값으로 동작을 완료한다.




11.5.2. 응용





11.6. 요약


-

한 개 이상의 원격 외부 서비스를 사용하는 긴 동작을 실행할 때는 비동기 방식으로 앱의 성능과 반응성을 향상시킬 수 있다.



-

우리 고객에게 비동기 API 를 제공하는 것을 고려해야 한다.

CompletableFuture 의 기능을 이용하면 쉽게 비동기 API 를 구현할 수 있다.



-

CompletableFuture 를 이용할 때 비동기 태스크에서 발생한 에러를 관리하고 전달할 수 있다.



-

동기 API 를 CompletableFuture 로 갑싸서 비동기적으로 소비할 수 있다.



-

서로 독립적인 비동기 동작이든 아니면 하나의 비동기 동작이 다른 비동기 동작의 결과에 의존하는 상황이든 여러 비동기 동작을 조립하고 조합할 수 있다.



-

CompletableFuture 에 콜백을 등록해서 Future 가 동작을 끝내고 결과를 생산했을 때 어떤 코드를 실행하도록 지정할 수 있다.



-

CompletableFuture 리스트의 모든 값이 완료될 때까지 기다릴지 아니면 하나의 값만 완료되길 기다릴지 선택할 수 있다.




반응형

댓글