Search

Stream API 병렬 데이터 처리하기

목차

1. 개요

JDK 7 부터는 포크/조인 프레임워크(fork/join framework)을 이용해서 쉽게 병렬화를 사용할 수 있도록 도와준다. 이 포크/조인 프레임워크와 내부적인 병렬 스트림 처리가 어떤 관계가 있는지부터 병렬 스트림을 제대로 사용하기 위한 기반 지식에 대해 알아보자.
그래서 최종적으로 Spliterator를 커스텀하게 만들어서 분할 과정을 원하는 식으로 만들어 볼 것이다.

2. 병렬 스트림

스트림에서 병렬화를 하는 방식은 너무 간단하다. 병렬화 하고자 하는 구조가 컬렉션이라면 parallelStream() 을 호출하면 병렬 스트림(parallel stream)이 생성된다. 배열이나 그 외의 요소도 stream().parallel() 만 호출해주면 병렬 스트림이 생성될 것이다. 병렬 스트림은 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림으로 전통적인 방식부터 병렬 스트림을 이용한 방식까지 각각 로직을 구현해서 성능을 비교해보자.

스트림 성능 측정해보기

숫자 합산 로직

1부터 n까지의 모든 숫자의 합계를 반환하는 메서드를 여러 방식을 이용해 다음과 같이 생성한다고 했을 때 각각의 성능차이는 어떨까?
/** * 순차 리듀싱 * 무한 스트림을 사용해 합계를 계산한다. */ public long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .reduce(0L, Long::sum); } /** * 반복형 * 스트림을 사용하지 않고 전통적인 방식으로 합계를 계산해 반환한다. */ public long iterativeSum(long n) { long result = 0; for (long i = 1L; i <= n; i++) { result += i; } return result; }
Java
복사
아직 병렬화를 안했는데, 병렬화는 parallel()만 호출해주면 된다.
/** * 병렬 리듀싱 * 무한 스트림을 병렬화 처리해 합계를 반환한다. */ public long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .parallel() // 스트림을 병렬 스트림으로 변환한다. .reduce(0L, Long::sum); }
Java
복사
이제 무언가 성능적인 개선이 있을 것으로 추측된다. 하지만, 아쉽게도 위 코드에서 병렬화 스트림으로 변환해서 생기는 이점은 없을 것이다.

why?

순차 스트림에서 parallel을 호출해봐야 스트림 자체에는 변화가 없다.
내부적으로만 이후 연산이 병렬로 수행되야 함을 의미하는 플래그가 설정되는 것이다. (그래서 반대로 병렬 스트림을 순차 스트림으로 바꾸는 sequential() 메서드도 존재한다.)
그럼 병렬화를 위해 스레드 생성이나 몇 개나 생성할지, 해당 과정을 어떻게 커스터마이징 할까?
내부적으로는 ForkJoinPool을 이용하는데, Runtime.getRuntime().availableProcessors()가 반환하는 값에 맞는 스레드를 가진다. 일반적으로 기기의 프로세서 수와 같기에 특별한 사정이 잇는게 아니라면 기본값을 사용하는걸 추천한다.

JMH를 이용해 스트림 성능 측정해보기

실제로 병렬화를 이용했을때 성능이 어떻게 바뀌는지 테스트 해 볼 필요가있는데, JMH(Java Microbenchmark Harness) 라이브러리를 이용해 벤치마크를 구현해 볼 것이다.
@State(Scope.Thread) //벤치 마크에 사용되는 Argument의 상태를 Thread별로 초기화한다. @BenchmarkMode(Mode.AverageTime) // 벤치마크 대상 메서드를 실행하는데 걸린 평균 시간 측정 @OutputTimeUnit(TimeUnit.MILLISECONDS)// 벤치마크 결과를 밀리초 단위로 출력 @Fork(value = 2, jvmArgs = {"-Xms4g", "-Xmx4g"})//4Gb의 힙 공간을 제공한 환경에서 두 번 벤치마크 수행 public class ParallelStreamBenchMark { private static final long N = 10_000_000L; @Benchmark //벤치마크 대상 메서드 public long parallelRangeSum() { return LongStream.rangeClosed(1, N) .parallel() .reduce(0L, Long::sum); } @TearDown(Level.Invocation) // 매 번 벤치마크를 실행한 다음 가비지 컬렉터 동작 시도 public void tearDown() { System.gc(); } public static void main(String[] args) throws IOException, RunnerException { Options opt = new OptionsBuilder() .include(ParallelStreamBenchMark.class.getSimpleName()) .warmupIterations(10) // 사전 테스트 횟수 .measurementIterations(10) // 실제 측정 횟수 .forks(1) // .build(); new Runner(opt).run(); // 벤치마킹 시작 } }
Java
복사
여기서 벤치마크 대상 메서드를 반복문, 순차 리듀싱, 병렬 리듀싱 순으로 체크해보면 결과는 다음과 같이 나온다.
반복문
순차 리듀싱
병렬 리듀싱
생각외로 병렬 스트림이 순차 리듀싱에 비교해서 더 느린 결과가 나왔다.
더 느린 결과가 나온데는 두 가지 이유가 있다.
반복 결과로 박싱된 객체가 만들어지며 숫자를 더하기위해 언박싱을 해야 한다.
(중요)반복 작업은 병렬로 수행할 수 있는 독립 단위로 나누기 힘들다.
여기서 두 번째 이유가 제일 중요한데, 리듀싱 과정을 시작하는 시점에선 전체 숫자 리스트가 준비되지 않았기에 병렬로 처리할 수 있도록 청크로 분할할 수 없다. 즉 스트림을 병렬로 처리되도록 플래그까지 지정했지만, 실질적으로는 순차처리 방식과 크게 다른적이 없이 스레드 할당에 비용만 추가되어 오버헤드만 증가하게 되었다.
그럼 병렬 스트림을 제대로 활용하기 위해 상기된 두 가지 문제를 해결해보자.

특화된 스트림/ 특화된 메서드 사용

우선 첫 번째로 박싱/언박싱에 드는 비용을 줄이기 위해 특수한 스트림을 사용할 필요가 있다.
Stream대신 LongStream을 사용한다면 언박싱된 인덱스를 제공한다. 즉 언박싱에 드는 비용이 없다.
그리고 두 번째 문제였던 숫자 리스트의 준비 문제는 range(or rangeClosed)라는 메서드를 사용할 수 있다. 미리 특정 범위의 인덱스를 준비함으로써 청크로 분할할 수 있는 숫자 범위를 생산한다.
public long rangeSum(long n) { return LongStream.rangeClosed(1, n) .reduce(0L, Long::sum); }
Java
복사
이 로직도 성능 측정을 해보면 다음과 같은 결과를 보인다.
rangeSum의 벤치마킹 결과
iterate 팩토리 메서드로 생성한 순차 리듀싱보다 높은 성능을 보인다. 이는 오토박싱, 언박싱의 비용을 해결했기 때문이다. 그럼 여기에 병렬 스트림까지 적용해보면 어떨까?
public long parallelRangeSum(long n) { return LongStream.rangeClosed(1, n) .parallel() .reduce(0L, Long::sum); }
Java
복사
가장 느렸던 병렬 리듀싱에 비교해서 엄청나게 높아진 성능을 확인할 수 있다.
이처럼 병렬화는 무작정 사용한다고 좋기보단, 여러 조건들을 맞추고 올바른 자료구조를 선택할 때 그 빛을 발할 수 있다.

주의사항

병렬화는 아무렇게나 사용해도 되는 키워드는 아니다.
병렬화를 사용하기 위해서는 스트림을 재귀적으로 분할하고, 각 서브스트림을 다른 스레드의 리듀싱 연산으로 할당해야 하고, 이 결과를 다시 하나의 값으로 합쳐야 하는데, 멀티코어간의 데이터 이동은 비싸다.
그렇기에 코어 간 데이터 전송시간보다 오래 걸리는 작업만 병렬로 수행하는게 좋다.
또한, 위의 예시처럼 병렬화를 이용할 수 없는 상황에서는 무의미하기에 항상 병렬화를 올바르게 사용하고 있는지 검증할 필요가 있다.
한가지 더 있다.
병렬 스트림을 사용하면서 공유 자원의 상태를 변경하는 알고리즘을 사용할 경우에도 문제가 된다.
위와 같은 상황에서 1에서 n까지 병렬 스트림이 실행된다면, 성능은 둘째치고, 올바른 결과값이 나오지 않을 것이다.
public class SideEffectAccumulator { private static class Accumulator { public long total = 0; public void add(long value) { total += value; } } public long sideEffectSum(long n) { Accumulator acc = new Accumulator(); LongStream.rangeClosed(1, n).forEach(acc::add); return acc.total; } public long sideEffectParallelSum(long n) { Accumulator acc = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(acc::add); return acc.total; } public static void main(String[] args) { final Long N = 10_000_000L; SideEffectAccumulator sea = new SideEffectAccumulator(); System.out.println("sideEffectSum: "+ sea.sideEffectSum(N)); System.out.println("sideEffectParallelSum: "+ sea.sideEffectParallelSum(N)); } }
Java
복사
병렬 스트림을 사용하니 결과가 올바른 값(50000005000000)이 아닌 엉뚱한 값이 나오는데, 이는 여러번 시도해도 그렇다.
여러 스레드에서 동시에 누적자(total += value)를 실행하기에 이런 문제가 생기는데, 이런 결과를 만들지 않기 위해서는 상태 공유에 따른 부작용을 고려하고 피해야 한다.
그럼 병렬 스트림을 어떻게 사용을 해야 올바른 사용일까?

올바른 병렬 스트림 사용법

이펙티브 자바에서는 병렬 스트림을 아예 사용하는걸 지양하기를 권장한다.
그 이유는 위에 소개한바와 같다. 하지만 그래도 병렬 스트림을 이용해 성능 개선을 노려볼 생각이라면 다음의 내용이 약간의 힌트는 될 수 있다.
1.
항상 나 자신의 대한 믿음보다는 측정을 통해 분석하라. : 순차 스트림을 병렬 스트림으로 바꾸는것은 플래그 하나만 바꾸면 되기에 간단하다. 그렇기에 병렬 스트림이 순차 스트림보다 실제로 개선이 되는지 적절한 벤치마크로 직접 성능을 측정하는게 좋다. 단순히 병렬 스트림을 썼으니 좋아졌을 꺼라고 희망회로를 돌려보다간 위의 예시처럼 성능이 더 느리거나 최악의 경우에는 값마저 제대로 안나올 수 있다.
2.
박싱을 주의하라. : 자동 박싱과 언박싱은 성능을 크게 떨어트리는 요소 중 하나다. 자바에서는 이런 박싱 비용을 절약하기위해 특화 스트림(IntStream, LongStream, DoubleStream)을 제공한다. 따라서 되도록 기본형 특화 스트림을 사용하는게 좋다.
3.
순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다. : 순서에 의존하는 연산(ex: limit, findFirst)은 병렬 스트림에서 수행하기위해선 많은 비용이 필요하다.
4.
스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라 : 처리해야 할 요소 수가 N 이고 하나의 요소를 처리하는데 드는 비용이 Q라 하면 전체 스트림 파이프라인 처리 비용은 N*Q라 할 수 있는데, Q가 높아진다는 것은 병렬 스트림으로 성능 개선을 할 여지가 있음을 의미한다.
5.
소량의 데이터는 병렬 스트림이 도움되지 않는다. : 소량의 데이터는 병렬화 과정에 생기는 부가 비용이 더 크기에 순차 스트림을 사용하는게 더 효율적이다.
6.
적절한 자료구조인지 확인하자. : ArrayList같은 경우 요소를 탐색하지 않고도 리스트를 분할 할 수 있기에 병렬 스트림을 사용하기 적절한 반면 LinkedList는 분할하기위해 모든 요소를 탐색해야 하기에 적절하지 않다.
7.
스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 성능이 달라질 수 있다. : map이나 SIZED 스트림은 크기를 알고 있기에 스트림 분할을 할 수 있고 병렬 처리가 수월하지만, 필터 연산같은 경우 스트림의 길이를 예측할 수 없기 때문에 효과적으로 스트림을 병렬 처리할 수 있을지 알 수 없게 된다.
8.
최종 연산의 병합 과정(ex: Collector의 combiner) 비용을 살펴보라 : 병합 과정의 비용이 비싸다면 병렬 스트림으로 얻은 성능의 이익이 서브스트림의 부분 결과를 합치는 과정에서 상쇄될 수 있다.

참고: 스트림 소스와 분해성

소스
분해성
ArrayList
훌륭함
LinkedList
나쁨
IntStream.range
훌륭함
Stream.iterate
나쁨
HashSet
좋음
TreeSet
좋음

3. 포크/조인 프레임워크

포크/조인 프레임워크는 병렬화 할 수 있는 작업을 재귀적으로 작은 작업들로 쪼개서 분할한 다음 각각의 서브태스크들이 작업 수행한 뒤 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다.
포크/조인 프레임워크는 서브태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExeecutorService 인터페이스를 구현한다.

RecursiveTask

스레드 풀을 이용하기 위해 RecursiveTask<R>의 서브 클래스를 만들어야 한다. 여기서 R은 병렬화된 태스크가 생성하는 결과 형식 또는 결과가 없을 때 RecursiveAction 형식이다.
RecursiveTask를 구현하려면 추상 메서드인 compute를 구현해야 한다.
protected abstract R compute();
Java
복사
이 compute 메서드에서는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브 태스크의 결과를 생산할 알고리즘을 정의한다.
if(태스크가 충분히 작거나 더 이상 분할 할 수 없을 경우) { 순차적으로 태스크 계산 } else { 태스크를 두 서브태스크로 분할 태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출 모든 서브태스크의 연산이 완료될 때까지 기다린다. 각 서브태스크의 결과를 합침 }
Java
복사
compute 메서드 수도 코드
이제 실제로 병렬 합계를 구하는 ForkJoinSumCalculator를 RecursiveTask를 상속해 구현해보자.
public class ForkJoinSumCalculator extends RecursiveTask<Long> { private final long[] numbers; //서브태스크에서 처리할 배열의 초기/최종 위치 private final int start; private final int end; //서브 태스크의 분할 기준점 public static final long THRESHOLD = 10_000; public ForkJoinSumCalculator(long[] numbers) { this(numbers, 0, numbers.length); } private ForkJoinSumCalculator(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { int length = end - start; if (length <= THRESHOLD) { //해당 태스크에서 계산할 길이가 기준보다 낮으면 순차적으로 결과를 계산해 반환한다. return computeSequentially(); } //배열의 첫 번째 절반을 더하도록 서브 태스크 생성 ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); //ForkJoinPool의 다른 스레드로 새로 생성한 태스크를 비동기 실행 leftTask.fork(); //배열의 나머지 절반을 더하도록 서브 태스크 생성 ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end); //두 번째 서브 태스크를 동기 실행하고 이 때 추가로 분할이 일어날 수 있다. Long rightResult = rightTask.compute(); //첫 번째 서브태스크의 결과를 읽거나 아직 결과가 없으면 기다린다. Long leftResult = leftTask.join(); return rightResult + leftResult; } //더 분할할 수 없을 경우 서브태스크의 결과를 계산하는 알고리즘 private long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } public static long forkJoinSum(int n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinSumCalculator task = new ForkJoinSumCalculator(numbers); return new ForkJoinPool().invoke(task); } public static void main(String[] args) { long l = forkJoinSum(10_000_000); System.out.println("l = " + l); } }
Java
복사
이 ForkJoinSumCalculator를 실행하면 다음과 같은 흐름으로 진행된다.
포크/조인 알고리즘
1.
LongStream으로 1부터 n(10_000_000)까지의 배열을 생성한다.
2.
생성한 배열을 전달해 ForkJoinSumCalculator 태스크를 만든다.
3.
ForkJoinSumCalculator 를 ForkJoinPool로 전달한다.
4.
풀의 스레드가 ForkJoinSumCalculator의 compute 메서드를 실행하며 작업을 수행한다.
5.
compute 메서드는 병렬로 실행할 수 있을만큼 태스크가 작아졌는지 확인하며, 태스크가 아직 크다고 생각하면 숫자를 반으로 분할해 새로운 ForkJoinSumCalculator로 할당한다.
6.
다시 ForkJoinPool이 새로 생성된 ForkJoinSumCalculator를 실행한다. 이 과정이 재귀적으로 반복되며 주어진 조건(TRASHOLD)을 만족할 때까지 태스크 분할을 반복한다.
7.
각 서브태스크는 순차적으로 처리되어 포킹 프로세스로 만들어진 이진 트리의 태스크를 루트에서 역순으로 방문하여 부분 결과를 합쳐 최종 결과를 계산해 반환한다.
일반적으로 둘 이상의 ForkJoinPool은 사용하지 않는다.
즉 소프트웨어의 필요한 곳에서 언제든 쓸 수 있도록 ForkJoinPool을 한 번만 인스턴스화해서 정적 필드에 싱글턴으로 저장한다. 인수가 없는 디폴트 생성자를 이용했는데, 이는 JVM에서 이용할 수 있는 모든 프로세서가 자유롭게 풀에 접근할 수 있음을 의미한다.

참고: 포크/조인 프레임워크 100% 활용하기

join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시키기에 두 서브태스크가 모두 시작된 다음 join을 호출시켜야 한다.
RecursiveTask 내에서 ForkJoinPool의 invoke 메서드를 사용해선 안된다. 대신compute나 fork 메서드를 호출할 순 있다. invoke 메서드는 순차 코드에서 병렬 계산을 시작할 때만 사용한다.
서브태스크에서 fork 메서드로 ForkJoinPool의 일정을 조절할 수 있다. 두 서브태스크가 모두 fork()를 호출하는게 자연스러워 보이지만, 한 쪽 작업은 compute를 호출하는게 효율적이다. 이렇게 하면 두 서브태스크의 한 태스크에선 같은 스레드를 재사용할 수 있기에 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다.
병렬 처리로 성능을 개선하기 위해서는 태스크를 여러 독립적인 서브태스크로 분리할 수 있어야 하고, 각 서브태스크의 실행시간이 새로운 태스크를 포킹하는데 드는 비용보다 커야 한다.

작업 훔치기

위에 작성한 ForkJoinSumCalculator는 서브태스크 기준인 숫자가 10_000이였다. 즉 천만 개 항목을 포함하는 배열을 사용할 경우 천 개 이상의 서브 태스크를 포크할 것이다. 하지만 대부분의 기기는 4 core 이기 때문에 천 개 이상의 태스크는 각각의 태스크가 CPU로 할당된다는 것을 생각해보면 의미가 없어보인다.
그런데, 포크/조인 프레임워크에선 작업 훔치기(work stealing) 라는 기법이 있다.
해당 기법에선 ForkJoinPool의 모든 스레드를 거의 공정하게 분할하고, 각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트를 참조하여 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와서 작업을 처리한다. 그렇기에 먼저 할 일을 끝낸 스레드는 유휴 상태로 바뀌는게 아니라 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다. 그리고 이런 과정이 모든 태스크가 작업이 끝날 때까지 반복되는데, 그렇기에 태스크의 크기를 작게 나눠야 작업자 스레드 간의 작업 부하를 비슷한 수준으로 맞출 수 있다.
따라서 천 개이상의 태스크를 분할하는것이 의미없는게 아닌게 되는 것이다.
그런데, 잘 생각해보면 스트림을 사용할 때 우리는 별도의 ForkJoinXXXX를 만들지 않고도 자동으로 병렬 스트림을 사용할 수 있었는데, 그건 어떻게 가능했던 것일까? 스트림을 자동으로 분할해주는 기능이 이미 존재하기 때문인데, 이 기능은 스트림을 분할하는 기법인 Spliterator을 이용하는 것이다.

4. Spliterator 인터페이스

JDK 8부터 제공되는 Spliterator 라는 인터페이스는 분할할 수 있는 반복자 라는 의미로 Iterator와 비슷하지만, 병렬 작업에 특화되어있다는 부분이 차이점이다.

Spliterator 인터페이스

public interface Spliterator<T> { boolean tryAdvanace(Consumer<? super T> action); Spliterator<T> trySplit(); long estimateSize(); int characteristics(); }
Java
복사
boolean tryAdvanace(Consumer<? super T> action);
: Spliterator의 요소를 하나씩 순차적으로 소비하며 탐색해야 하는 요소가 남아있는지 여부를 반환한다.
Spliterator<T> trySplit();
: Spliterator의 일부 요소를 분할해 두 번째 Spliterator를 생성하는 메서드.
long estimateSize();
: 탐색 해야 할 요소 수 정보를 제공하는 메서드.
characteristics();
: Spliterator 자체의 특성 집합을 포함하는 int를 반환한다.

Spliterator 의 분할 과정

재귀 분할 과정
재귀적인 방식으로 일어나는 스트림 분할 과정은 다음과 같은 흐름으로 진행된다.
1.
1번째 Spliterator에 trySplit을 호출해 2번째 Spliterator가 생성된다.
2.
2개의 Spliterator에 trySplit을 다시 호출하면 4개의 Spliterator가 생성된다.
3.
1-2 과정을 trySplit이 null을 반환할 때까지 반복한다.
4.
Spliterator에 호출한 모든 trySplit의 결과가 null이면 분할 과정이 종료된다.

참고: Spliterator 특성

: characteristics 메서드로 Spliterator 자체의 특성 집합을 포함하는 int를 반환하는데, 다음 표를 참고해 적절한 특성들을 합해 반환해주면 된다.
특성
의미
ORDERED
리스트처럼 요소에 정해진 순서가 있기에 요소 탐색 및 분할시 이 순서에 주의해야 한다.
DISTINCT
x,y 두 요소를 방문했을 때 x.equals(y)는 false여야 한다.
SORTED
탐색된 요소는 미리 정의된 순서를 따른다.
SIZED
크기가 알려진 소스(ex: Set)로 Spliterator를 생성했기에 estimatedSize()는 정확한 값을 반환한다.
NONNULL
탐색하는 요소는 Not Null이다
IMMUTABLE
이 Spliterator의 소스는 불변으로 요소를 탐색하는 동안 CUD 동작을 할 수 없다.
CONCURRENT
동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다.
SUBSIZED
이 Spliterator 그리고 분할되는 모든 Spliterator는 SIZED특성을 갖는다.

커스텀 Spliterator 만들기

문자열의 단어 수를 계산하는 단순한 메서드를 구현해보고 이를 Spliterator를 구현해서 적용해보자.

1. 반복형으로 만든 단어 수 카운팅 메서드

public static int countWordsIteratively(String s) { int counter = 0; boolean lastSpace = true; for (char c : s.toCharArray()) { if (Character.isWhitespace(c)) { lastSpace = true; } else { if (lastSpace) { counter++; } lastSpace = false; } } return counter; }
Java
복사
public static void main(String[] args) { final String SENTENCE = "Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura " + "ch la dritta via era smarrita "; System.out.println("Found " + CountWord.countWordsIteratively(SENTENCE) + " words"); }
Java
복사
전통적인 방식의 for-each문을 사용해서 문자를 탐색하면서 공백을 기준으로 단어의 수를 계산하는 로직으로 main함수로 내가 구현한 countWordsIteratively() 메서드를 실행해보면 Found 19 words 라는 실행 결과를 보여줄 것이다.

2. 함수형으로 만든 단어 수 카운팅 메서드

public class WordCounter { private final int counter; private final boolean lastSpace; public WordCounter(int counter, boolean lastSpace) { this.counter = counter; this.lastSpace = lastSpace; } public WordCounter accumulate(Character c) { if (Character.isWhitespace(c)) { return lastSpace ? this : new WordCounter(counter, true); } return lastSpace ? new WordCounter(counter + 1, false) : this; } public WordCounter combine(WordCounter wordCounter) { return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace); } public int getCounter() { return counter; } }
Java
복사
public static int countWord(Stream<Character> stream) { WordCounter counter = stream.reduce(new WordCounter(0, true), //초기 값 WordCounter::accumulate, //누적 로직 WordCounter::combine); //병합 로직 return counter.getCounter(); } public static void main(String[] args) { final String SENTENCE = "Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura " + "ch la dritta via era smarrita "; Stream<Character> stream = IntStream.range(0, SENTENCE.length()) .mapToObj(SENTENCE::charAt); System.out.println("result = " + countWord(stream)); }
Java
복사
스트림에서 리듀싱 연산을 수행할 때 지금까지 발견한 단어 수를 누적할 변수와 마지막 문자의 공백 여부를 기억하는 논리값 두 가지 변수가 필요한데 자바에서는 튜플이 없기에 두 정보를 저장하는 WordCounter라는 객체를 만들어서 제공한다.
이 방식으로도 반복형과 동일하게 Found 19 words 라는 결과를 반환할 것인데, 이제 병렬로 이 동작을 수행하는 방법에 대해 구현해보자.

3. WordCounter 병렬로 수행하기

위에서 만든 countWord에 그냥 stream.parallel()으로 인자를 전달하면, 19개가 아닌 이상한 결과가 반환될 것이다. 이는 순차 스트림을 병렬 스트림으로 바꾸는 과정에서 스트림 분할을 하는 위치에 따라 잘못된 값이 나올수도 있기 때문인데, 문자열을 임의의 위치가 아닌 단어가 끝나는 위치에서만 분할하는 방식을 사용해 이 문제를 해결할 수 있다.
//before case String a = "mi ritrovai in una selva oscura "; // 이 단어를 stream.parallel()로 분할했을 경우 String splitted_before_a = "mi ritrova in un"; String splitted_after_a = "a selva oscura";
Java
복사
이런식으로 문자열 중간이 짤리면서 단어가 추가 계산될 가능성이 생긴다.
그렇기에 이제 Spliterator를 구현하여 커스터마이징해서 내가 원하는대로 문자열을 분할하도록 해보자.
public class WordCounterSpliterator implements Spliterator<Character> { private final String string; private int currentChar = 0; public WordCounterSpliterator(String string) { this.string = string; } @Override public boolean tryAdvance(Consumer<? super Character> action) { action.accept(string.charAt(currentChar++)); //현재 문자를 소비한다. return currentChar < string.length(); // 소비할 문자가 있으면 true를 반환한다. } @Override public Spliterator<Character> trySplit() { int currentSize = string.length(); // 파싱할 문자열을 순차 처리할 정도로 잘개 쪼갰을 경우 null을 반환한다. if (currentSize < 10) { return null; } //파싱할 문자열의 중간을 분할 위치로 설정한다. for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) { //다음 공백이 나올 때까지 분할 위치를 뒤로 이동시킨다. if (Character.isWhitespace(string.charAt(splitPos))) { //처음부터 분할 위치까지 문자열을 파싱할 새로운 WordCounterSpliterator를 생성한다. WordCounterSpliterator spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos)); currentChar = splitPos; //이 WordCounterSpliterator의 시작 위치를 분할 위치로 설정한다. return spliterator;//공백을 찾고 문자열을 분리했기에 루프를 종료한다. } } return null; } @Override public long estimateSize() { return string.length() - currentChar; } @Override public int characteristics() { return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; } }
Java
복사
public boolean tryAdvance(Consumer<? super Character> action) {…}
: 문자열에서 현재 인덱스에 해당하는 문자를 Consumer에 제공하고 인덱스를 증가시킨다. 인수로 전달된 Consumer는 스트림을 탐색하며 적용해야 하는 함수 집합이 작업을 처리할 수 있게 소비한 문자를 전달하는 자바 내부 클래스이다.
public Spliterator<Character> trySplit() {…}
: 반복될 자료구조를 분할하는 로직을 포함하는 메서드로 RecursiveTask의 compute 메서드와 같이 분할 동작을 중단할 한계를 설정해야한다. 그리고 분할 과정에서 기준점 이하일 경우 null을 반환해 분할을 중지하도록 지시해야 한다.
public long estimateSize() { … }
: 탐색해야 할 요소의 개수는 Spliterator가 파싱할 문자열 전체 길이와 현재 반복중인 위치의 차이를 반환한다.
public int characteristics() {…}
: Spliterator의 특성을 추가해주는 로직으로 다음 특성들이 추가된다.
ORDERED: 문자열의 문자 등장 순서가 유의미함
SIZED: estimatedSize 메서드의 반환값이 정확함
SUBSIZED: typSplit으로 생성된 Spliterator도 정확한 크기를 가진다.
NONNULL: 문자열은 NOT NULL이다.
IMMUTABLE: 문자열 자체가 불변 클래스이다.
이제 이렇게 구현한 커스텀 Spliterator를 활용해보자.
WordCounterSpliterator wordCounterSpliterator = new WordCounterSpliterator(SENTENCE); Stream<Character> stream = StreamSupport.stream(wordCounterSpliterator, true); System.out.println("countWord = " + countWord(stream));
Java
복사
StreamSupport.stream 팩토리 메서드는 두 번째 파라미터로 병렬 스트림 생성 여부를 지시한다.

결론

스트림을 사용하면서 병렬 스트림은 스트림을 조금만 써보더라도 고려해보고 써보고싶은 키워드였다.
하지만, 지금까진 이펙티브자바만 보고 마냥 썼다간 위험하니 쓰지말자! 정도로만 인지하고 있었는데, 회사내에서 대용량 엑셀 다운로드를 고려하면서 여러가지 방법을 고안해야 했고 그 중에서 병렬 스트림도 고려사항이였다. 처음에는 그냥 parallel() 메서드 하나만 써서 해보면 되지않을까? ExecutorService만 사용해서 스레드 할당받고 로직 돌리면되지 않을까 생각했는데, 생각보다 고려 포인트도 많고, 무의식중에 알고는 있던 내용들에 대해 다시 한 번 짚어보는 시간이였던 것 같다.
특히 Spliterator에 대해서는 감도 안오던 인터페이스였는데 이번 기회를 통해 학습해서 좋았고, 회사 프로젝트에 적용을 해볼 부분이 생기면 반드시 적용을 해봐야겠다.

참고