본문 바로가기
프로그래밍 놀이터/안드로이드, Java

[Java8 In Action] #7 병렬 데이터 처리와 성능

by 돼지왕 왕돼지 2018. 12. 27.
반응형

[Java8 In Action] #7 병렬 데이터 처리와 성능



Java8 In Action 내용을 보며 정리한 내용입니다.

정리자는 기존에 Java8 을 한차례 rough 하게 공부한 적이 있고, Kotlin 역시 공부한 적이 있습니다.

위의 prerequisite 가 있는 상태에서 추가적인 내용만 정리한 내용이므로, 제대로 공부를 하고 싶다면 책을 구매해서 보길 권장합니다!


ArrayList, arraylist linkedlist parallel stream, boxing unboxing overhead, Combiner, combiner parallel method, COMPUTE, concurrent, custom spliterator, default spliterator, distinct, doubly linked list, executorservice, Filter Stream, findfirst, fork, fork join framework, fork join framework debug, ForkJoinPool, HashSet, immutable, intstream.range, Invoke, java.util.concurrent.ForkJoinPool.common.parallelism, java8 in action, JIT, Join, Limit, LinkedList, longstream, nonnull, ordered, parallel, parallel stream, parallelStream, range parallel stream, rangeclosed, RecursiveAction, recursivetask, sequential, sequentialstream, sized, sized stream, sorted, split iterate, splitable iterator, spliterator, stream.iterate, subsized, tail, TreeSet, Warm Up, work stealing, 병렬 데이터 처리, 병렬 데이터 처리와 성능, 병렬 스트림, 병렬 스트림 측정, 분해성 나쁜 소스, 분해성 좋은 소스, 분해성 훌륭한 소스, 이중 연결 리스트, 작업 훔치기, 포크 조인 프레임워크



7.1. 병렬 스트림


-

병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다.



7.1.1. 순차 스트림을 병렬 스트림으로 변환하기


-

순차 스트림에 parallel 메서드를 호출하면 병렬로 처리된다.

parallelStream 에 sequential 을 호출하면 순차 스트림이 된다.



-

병렬 스트림은 내부적으로 ForkJoinPool 을 사용한다.

ForkJoinPool 은 프로세서 수, 즉 Runtime.getRuntime().availableProcessors() 가 반환하는 값에 상응하는 스레드를 갖는다.


이 값을 바꾸려면 아래와 같이 해주면 된다.( 전역 설정 )

System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”, “12”);



7.1.2. 스트림 성능 측정


-

성능을 최적화할 때는 세 가지 황금 규칙을 기억해야 한다.

측정! 측정! 그리고 측정!



-

Stream.iterate(1L, i -> i + 1)
    .limit(n)
    .parallel()
    .reduce(0L, Long::sum);

위의 코드는 iterate 가 Boxing 형 객체를 생성하고, iterate 는 병렬로 실행할 수 있도록 독립적인 청크를 분할하기가 어렵다.

그래서 for 문, SequentialStream, ParallelStream 구현 중 가장 느리게 작동한다.



더 특화된 메서드 사용


-

LongStream.rangeClosed 는 기본형 long 을 직접 사용해서 boxing, unboxing overhead 가 없다.

또한 쉽게 청크로 분할할 수 있는 숫자 범위를 생산한다.



-

병렬화는 공짜가 아니라는 사실을 기억해야 한다. 병렬화를 이용하려면 스트림을 재귀적으로 분할해야 하고, 각 서브스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 이들 결과를 하나의 값으로 합쳐야 한다.

멀티코어 간의 데이터 이동은 우리 생각보다 비싸다. 따라서 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직하다.

또한 상황에 따라 쉽게 병렬화를 이용할 수 있거나 아니면 아예 병렬화를 이용할 수 없는 때도 있다.

그리고 스트림을 병렬화해서 코드 실행 속도를 빠르게 하고 싶으면 항상 병렬화를 올바르게 사용하고 있는지 확인해야 한다.




7.1.3. 병렬 스트림의 올바른 사용법


-

병렬 스트림을 잘못 사용하면서 발생하는 많은 문제는 공유된 상태를 바꾸는 알고리즘을 사용하기 때문.




7.1.4. 병렬 스트림 효과적으로 사용하기


-

언제 병렬 스트림을 사용해야 할지 갯수 측면에서 확신이 서지 않는다면 직접 측정하라.

순차 스트림과 병렬 스트림의 변환은 너무 간단하다.

둘 다 벤치마크로 직접 성능을 측정하는 것이 바람직하다.



-

박싱을 주의하라. 되도록이면 기본형 특화 스트림을 사용하는 것이 좋다.



-

순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다.

특히 limit 이나 findFirst 처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용을 치러야 한다.

(findAny 는 순서 상관없어서 성능에 영향이 없다.)



-

스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라.

처리해야 할 요소 수가 N 이고 하나의 요소를 처리하는데 드는 비용을 Q 라고 하면 전체 스트림 파이프라인 처리비용은 N * Q.

Q 가 높아진다는 것은 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있음을 의미한다.



-

소량의 데이터에서는 병렬 스트림이 도움 되지 않는다.

병렬화 과정에서 생기는 부가 비용을 상쇄할 수 있을 만큼의 이득을 얻기 쉽지 않기 떄문이다.



-

스트림을 구성하는 자료구조가 적절한지 확인하라.

예를 들어 ArrayList 를 LinkedList 보다 효율적으로 분할할 수 있다.

LinkedList 는 분할하기 위해 요소 탐색을 해야 하기 때문이다.


또한 range 팩토리 메서드로 만든 기본형 스트림도 쉽게 분해할 수 있다.

커스텀 Spliterator 를 구현해서 분해 과정을 완벽하게 제어할 수도 있다.



-

스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.

예를 들어 SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할할 수 있으므로 효과적으로 스트림을 병렬 처리할 수 있다.

반면 필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 효과적으로 스트림을 병렬 처리할 수 있을지 알 수 없게 된다.



-

최종 연산의 병합 과정, 예를 들면 Collector 의 combiner 메서드, 비용을 살펴보라. 병합 과정의 비용이 비싸다면 병렬 스트림으로 얻은 성능의 이익이 서브스트림의 부분결과를 합치는 과정에서 상쇄될 수 있다.



-

분해성 훌륭한 소스 : ArrayList, IntStream.range

분해성 좋은 소스 : HashSet, TreeSet

분해성 나쁜 소스 : LinkedList, Stream.iterate



-

병렬 스트림이 수행되는 내부 인프라구조는 자바7에서 추가된 포크/조인 프레임워크이다.






7.2. 포크/조인 프레임워크


-

포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다. 포크/조인 프레임워크에서는 서브태스크를 스레드풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.



7.2.1. RecursiveTask 활용


-

스레드풀을 이용하려면 RecursiveTask<R> 의 서브클래스를 만들어야 한다.

여기서 R은 병렬화된 태스크가 생성하는 결과 형식 또는 결과가 없을 때는 RecursiveAction 형식이다.

RecursiveTask 를 정의하려면 추상 메서드 compute 를 구현해야 한다.


protected abstract R compute();


compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의한다.

따라서 대부분의 compute 메서드 구현은 다음과 같은 수도코드 형식을 한다.


if(태스크가 충분히 작거다 더 이상 분할할 수 없으면){

    순차적으로 태스크 계산

}else{

    태스크를 두 서브태스크로 분할

    태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출

    모든 서브태스크의 연산이 완료될 때까지 기다림

    각 서브태스크의 결과를 합침

}



-

태스크를 더 분할할 것인지 말 것인지 정해진 기준은 없지만 몇 가지 경험적으로 얻은 좋은 데이터가 있는데, 이는 나중에 다룬다.



-

ex)


public class ForkJoinSumCalculator extends RecursiveTask<Long>{

private final long[] numbers;

private final int start;

private final int end;

public static final long THRESHOLD = 10_000;


public ForkJoinSumCalculator(long[] numbers){

this(numbers, 0, numbers.length);

}


private ForkJoinSumCalculator(long[] numbers, int start, int end){

this.numbers = numbers;

this.start = start;

this.end = end;

}


@Override

protected Long compute(){

int length = end - start;

if(length <= THRESHOLD){

return computeSequentially();

}


ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);

leftTask.fork();


ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);

Long rightResult = rightTask.compute();

Long leftResult = leftTask.join();

return leftResult + rightResult;

}


private long computeSequentially(){

long sum = 0;

for( int i=start; i < end; i++){

sum += numbers[i];

}

return sum;

}

}

public static long forkJoinSum(long n){
    long[] numbers = LongStream.rangeClosed(1, n).toArray();
    ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
    return new ForkJoinPool().invoke(task);
}

일반적으로 ForkJoinPool 은 앱에서 하나만 instance(싱글톤) 를 만든다.



ForkJoinSumCalculator 실행


-

ForkJoinPool 의 invoke 로 ForkJoinTask 를 전달하면 compute 메서드를 실행하면서 작업을 수행한다.




7.2.2. 포크/조인 프레임워크를 제대로 사용하는 방법


-

join 메서드를 태스크에 호출하면 테스크가 생산하는 결과가 준비될 때까지 호출자를 블록시킨다. 따라서 두 서브태스크가 모두 시작된 다음에 join 을 호출해야 한다.



-

RecursiveTask 내에서는 ForkJoinPool 의 invoke 메서드를 사용하지 말아야 한다. 대신 compute 나 fork 메서드를 직접 호출할 수 있다. 순차 코드에서 병렬 계산을 시작할 때만 invoke 를 사용한다.



-

서브태스크에 fork 메서드를 호출해서 ForkJoinPool 의 일정을 조절할 수 있다. 왼쪽 작업과 오른쪽 작업 모두에 fork 메서드를 호출하는 것이 자연스러울 것 같지만 한쪽 작업에는 fork 를 호출하는 것보다 compute 를 호출하는 것이 효율적이다. 그러면 두 서브 태스크의 한 태스크에는 같은 스레드를 재사용할 수 있어 불필요한 태스크를 풀에 할당하는 오버헤드를 피할 수 있다.



-

포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅하기 어렵다. 



-

병렬 스트림에서 살펴본 것처럼 멀티코어에 포크/조인 프레임워크를 사용하는 것이 순차 처리보다 무조건 빠를 거라는 생각은 버려야 한다. 병렬 처리로 성능을 개선하려면 태스크를 여러 독립적인 서브태스크로 분할할 수 있어야 한다. 각 서브태스크의 실행시간은 새로운 태스크를 포킹하는 데 드는 시간보다 길어야 한다. 또한 순차 버전과 병렬 버전의 성능을 비교할 때는 다른 요소도 고려해야 한다. 다른 자바 코드와 마찬가지로 JIT 컴파일러에 의해 최적화되려면 몇 차례의 준비 과정(warm up) 또는 실행 과정을 거쳐야 한다. 따라서 성능을 측정할 때는 여러 번 프로그램을 실행할 결과를 측정해야 한다. 또한 컴파일러 최적화는 병렬 버전보다 순차 버전에 집중될 수 있다는 사실도 기억하자.




7.2.3. 작업 훔치기


-

실제로는 코어 개수와 관계없이 적절한 크기로 분할된 많은 태스크를 포킹하는 것이 바람직하다. 이론적으로 코어 개수만큼 병렬화된 태스크로 작업부하를 분할하면 모든 CPU 코어에서 태스크를 실행할 것이고 크기가 같은 각각의 태스크는 같은 시간에 종료될 것이라 생각할 수 있다. 하지만 복잡한 시나리오가 사용되는 현실에서는 각각의 서브태스크의 작업완료 시간이 크게 달라질 수 있다. 분할 기법이 효율적이지 않았기 때문일 수도 있고, 예기치 않게 디스크 접근 속도가 저하되었거나 외부 서비스와 협력하는 과정에서 지연이 생길 수 있기 때문이다.



-

포크/조인 프레임워크에서는 위와 같은 문제를 작업 훔치기(work stealing)라는 기법으로 해결한다. 작업 훔치기 기법에서는 ForkJoinPool 의 모든 스레드를 거의 공정하게 분할한다. 각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트(doubly linked list)를 참조하면서 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와서 작업을 처리한다. 이때 한 스레드는 다른 스레드보다 자신에게 할당된 태스크를 더 빨리 처리할 수 있다. 이때 할일이 없어진 스레드는 유휴 상태로 바뀌는 것이 아니라 다른 스레드 큐의 꼬리(tail)에서 작업을 훔쳐온다. 모든 태스크가 작업을 끝낼 때까지, 즉 모든 큐가 빌 때까지 이 과정을 반복한다. 따라서 태스크의 크기를 작게 나누어야 작업자 스레드 간의 작업부하를 비슷한 수준으로 유지할 수 있다.






7.3. Spliterator

(사실 Spliterator 는 언제 써야 하는지.. 어떻게 써야 하는지 잘 모르겠다.. 다른 말로 이걸 사용할 일이 있을까싶은.. 너무 고급 주제인듯하다..)


-

Spliterator 는 Split + Iterate 로 분할할 수 있는 반복자라는 의미이다. ( or Splitable iterator )



-

Spliterator 는 Iterator 처럼 요소 탐색 기능을 제공하면서도 병렬 작업에 특화된 녀석이다.

자바8은 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공한다.

public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action); // hasNext + next 를 합쳐놓은 녀석?
    Spliterator<T> trySplit(); // 분할해서 다른 Spliterator 를 생성
    long estimateSize(); // 탐색해야 할 요소 수 정보, 이 값을 사용해서 더 공평하게 나눌 수 있음
    int characteristics();
}



7.3.1. 분할 과정


-

첫번째 Spliterator 에 trySplit 을 호출하면 두 번째 Spliterator 가 생성된다.

Spliterator 에 다시 trySplit 을 호출해서 null 이 나올대까지 이 과정을 반복한다.



Spliterator 특성


-

ORDERED : 정해진 순서가 있음

DISTINCT : x.equals(y) 는 항상 false 를 반환

SORTED : 탐색된 요소는 미리 정의된 정렬 순서를 따름.

SIZED : 크기가 알려진 소스.

NONNULL : nonnull

IMMUTABLE : 소스가 불변. 요소를 탐색하는 동안 요소를 추가하거나 삭제할 수 없다.

CONCURRENT : 동기화 없이 Spliterator 의 소스를 여러 스레드에서 동시에 고칠 수 있다.

SUBSIZED : 이 Spliterator 그리고 분할되는 모든 Spliterator 는 SIZED 특성을 갖는다.




7.3.2. 커스텀 Spliterator 구현하기


함수형으로 단어 개수 계산 메서드 재구현하기



WordCounter 병렬로 수행하기

class WordCounterSpliterator implements Spliterator<Character> {
    private final String string;
    private int currentChar = 0;

    public WordCounterSpliterator(String string){
        this.string = string
    }

    @Override
    public boolean tryAdvance(Consume<? super Character> action){
        action.accept(string.charAt(currentChar++));
        return currentChar < string.length();
    }

    @Override
    public Spliterator<Character> trySplit(){
        int currentSize = string.length() - currentChar;
        if ( currentSize < 10 ){
            return null;
        }

        for(int splitPos = currentSize / 2 + currentChar; splitPos < String.length(); splitPos++){
            if (Character.isWhitespce(string.charAt(splitPos))){
                Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
                currentChar = splitPos;
                return spliterator;
            }
        }
        return null;
    }

    @Override
    public long estimateSize(){
        return string.length() - currentChar;
    }

    @Override
    public int characteristics(){
        return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
    }
}


WordCounterSpliterator 활용

Spliterator<Character> sliterator = new WordCounterSpliterator(SENTENCE);

Stream<Character> stream = StreamSupport.stream(spliterator, true);

System.out.println("Found " + countWords(stream) + " words");





7.4. 요약


-

내부 반복을 이용하면 명시적으로 다른 스레드를 사용하지 않고도 스트림을 병렬로 처리할 수 있다.



-

간단하게 스트림을 병렬로 처리할 수 있지만 항상 병렬 처리가 빠른 것은 아니다. 병렬 소프트웨어 동작 방법과 성능은 직관적이지 않을 때가 많으므로 병렬 처리를 사용했을 때 성능을 직접 측정해봐야 한다.



-

병렬 스트림으로 데이터 집합을 병렬 실행할 때 특히 처리해야 할 데이터가 아주 많거나 각 요소를 처리하는 데 오랜 시간이 걸릴 때 성능을 높일 수 있다.



-

가능하면 기본형 특화 스트림을 사용하는 등 올바른 자료구조 선택이 어떤 연산을 병렬로 처리하는 것보다 성능적으로 더 큰 영향을 미칠 수 있다.



-

포크/조인 프레임워크에서는 병렬화할 수 있는 태스크를 작은 태스크로 분할한 다음에 분할된 태스크를 각각의 스레드로 실행하며 서브태스크 각각의 결과를 합쳐서 최종 결과를 생산한다.



-

Spliterator 는 탐색하려는 데이터를 포함하는 스트림을 어떻게 병렬화할 것인지 정의한다.





반응형

댓글