Search

CompletableFuture

1. 자바 Concurrent 프로그래밍 소개

Concurrent 소프트웨어

동시에 여러 작업을 할 수 있는 소프트웨어
예) 인터넷 강의를 보면서 notion으로 필기를 하고 그와중에 IntelliJ로 코딩을 할 수 있다.

자바에서 지원하는 Cuncurrent 프로그래밍

멀티프로세싱(ProcessBuilder)
멀티쓰레드

자바 멀티쓰레드 프로그래밍

Thread/Runnable

예제

public class App { public static void main(String[] args) { MyThread myThread = new MyThread(); myThread.start(); //Anonymous class Thread runnable = new Thread(new Runnable() { @Override public void run() { System.out.println("Runnable Thread: " + Thread.currentThread().getName()); } }); //Lambda Expression Thread lambdaThread = new Thread(() -> System.out.println("Lambda Thread:" + Thread.currentThread().getName())); System.out.println("Hello: "+Thread.currentThread().getName()); } //Thread 상속 static class MyThread extends Thread { @Override public void run() { System.out.println("Thread: "+Thread.currentThread().getName()); } } }
Java
복사
[실행 결과]
Hello: main Thread: Thread-0
⇒코드의 실행 순서만 봐서는 Thread가 먼저 출력되야 할 것 같지만, 실제로 실행해보면 다르게 출력될 때도 있습니다. 이를 통해 Thread는 순서를 보장하지 않는다는 것을 알 수 있습니다.
여기선 로컬클래스를 이용했지만, 익명클래스와 람다표현식을 이용해서도 적용할 수 있습니다.

주요기능(method)

sleep(mills)
⇒ 현재 쓰레드 재우기(멈춰두기): 스레드를 대기상태로 멈춰서 다른 스레드가 처리할 수 있도록 합니다. 하지만 락을 놔주진 않기에 자칫하면 데드락 상태에 걸릴 수 있습니다.
public static void main(String[] args) throws InterruptedException { //Lambda Expression Thread lambdaThread = new Thread(() -> { try{ Thread.sleep(1000L); }catch(InterruptedException e){ System.out.println("interrupted!"); return; } System.out.println("Thread: "+Thread.currentThread().getName()); }); lambdaThread.start(); System.out.println("Hello: "+Thread.currentThread().getName()); }
Java
복사
1.
Thread.sleep(1000L)
⇒Thread를 start하면 1초(1000L)동안 멈춰있고 그 동안 다른 쓰레드를 수행하기 때문에 Hello가 항상 우선 출력됩니다.
interrupt()
⇒ 다른 쓰레드를 깨워 InterruptException 을 발생시킵니다. 이 에러에 대한 핸들링은 코딩을 통해 구현할 수 있습니다.
public static void main(String[] args) throws InterruptedException { //Lambda Expression Thread lambdaThread = new Thread(() -> { try{ Thread.sleep(3000L); }catch(InterruptedException e){ System.out.println("interrupted!"); return; } System.out.println("Thread: "+Thread.currentThread().getName()); }); lambdaThread.start(); lambdaThread.interrupt(); System.out.println("Hello: "+Thread.currentThread().getName()); }
Java
복사
1.
lambdaThread.interrupt();
⇒ lambdaThread에 interrupt()메소드를 호출해 lambdaThread내에 InterruptedException 을 발생시킵니다.
join()
⇒ 다른 쓰레드가 끝날 때까지 기다립니다.
public static void main(String[] args) throws InterruptedException { //Lambda Expression Thread lambdaThread = new Thread(() -> { try{ Thread.sleep(3000L); }catch(InterruptedException e){ System.out.println("interrupted!"); return; } System.out.println("Thread: "+Thread.currentThread().getName()); }); lambdaThread.start(); lambdaThread.join(); System.out.println("Hello: "+Thread.currentThread().getName()); }
Java
복사
1.
lambdaThread.join();
⇒ lambdaThread에 join()메소드를 호출하여 lambdaThread가 종료될 때까지 기다립니다.

2. Executors

: Runnable이나 Thread와 같은 Low-level이아닌 고 수준(High-Level) Concurrency 프로그래밍
우리가 Runnable만 정의해서 제공해주면 스레드를 만들고, 불필요해지면 종료하고 관리해주는 작업들을 대신 해주는 클래스

Executors가 하는 일

쓰레드 만들기: 애플리케이션이 사용할 쓰레드 풀을 만들어관리한다.
쓰레드 관리: 쓰레드 생명 주기를 관리한다.
작업 처리 및 실행: 쓰레드로 실행할 작업을 제공할 수 있는 API를 제공한다.

주요 인터페이스

Executor: execute(Runnable)
ExecutorService: Executor를 상속 받은 인터페이스로, Callable도 실행 가능하며 Executor를 종료 시키거나 여러 Callable을 동시에 실행하는 등의 기능을 제공한다.
ScheduledExecutorService: ExecutorService를 상속 받은 인터페이스로 특정 시간 이후에 또는 주기적으로 작업 실행할 수 있다.

예제

기본 사용 예제
public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); //Legacy case executorService.execute(new Runnable() { @Override public void run() { System.out.println("Thread: "+Thread.currentThread().getName()); } }); //Lambda Expression executorService.submit(()->{ System.out.println("Lambda Expression Thread: "+Thread.currentThread().getName()); }); executorService.shutdown(); //graceful shutdown //executorService.shutdownNow(); //즉시 종료 }
Java
복사
2개의 Thread를 이용하여 실행
public class App { private static Runnable getRunnable(String message) { return ()->System.out.println(message + Thread.currentThread().getName()); } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(getRunnable("Hello")); executorService.submit(getRunnable("World")); executorService.submit(getRunnable("The")); executorService.submit(getRunnable("Java")); executorService.submit(getRunnable("Lecture")); executorService.submit(getRunnable("Concurrent")); executorService.submit(getRunnable("Part")); executorService.shutdown(); //graceful shutdown } }
Java
복사
[실행 결과]
1.
Executors.newFixedThreadPool(2)
⇒ 해당 메소드를 호출하면 해당 영역에는 인자값으로 넘겨준 숫자만큼 Thread를 관리합니다. 위 코드에서는 2를 인자값으로 넘겨줬기 때문에 2개의 2개의 쓰레드를 관리하는 Thread Pool이 도는동안 Blocking Queue에 등록된 작업들이 차례대로 동작합니다.
ExecutorService.newSingleThreadScheduledExcutor();
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); executorService.scheduleAtFixedRate(getRunnable("hello"), 3, 1, TimeUnit.SECONDS);
Java
복사
1.
scheduleAtFixedRate(실행 Runnable, 시작 지연 시간, 딜레이, 파라미터 시간 단위)
⇒ 위 코드에서는 Runnable타입을 반환하는 getRunnable() 메소드를 프로그램이 시작 후 3초 뒤부터 1초마다 수행하는 코드입니다.

3. Callable과 Future

Callable

Runnable과 거의 유사하지만 반환값을 가질 수 있다.
get() 메소드를 통해 값을 반환받을 수 있습니다.
해당 메소드는 블록킹 콜이기에 메소드 호출시점부터 코드실행 완료까지 기다립니다.
타임아웃을 설정할 수 있습니다.
ExecutorService executorService = Executors.newSingleThreadExecutor(); Callable<String> hello = () ->{ Thread.sleep(2000L); return "Hello"; }; Future<String> submit = executorService.submit(hello); System.out.println("Started!"); submit.get();//blocking System.out.println("End"); executorService.shutdown();
Java
복사
[실행 결과]
isDone()메소드를 통해 작업 상태를 확인할 수 있습니다.
ExecutorService executorService = Executors.newSingleThreadExecutor(); Callable<String> hello = () ->{ Thread.sleep(2000L); return "Hello"; }; Future<String> helloFuture = executorService.submit(hello); System.out.println(helloFuture.isDone()); System.out.println("Started!"); helloFuture.get();//blocking System.out.println(helloFuture.isDone()); System.out.println("End"); executorService.shutdown();
Java
복사
[실행 결과]
get()을 호출하기전에는 isDone()false로 아직 수행되지 않은걸 알 수 있고, get()호출 이후 시점에서는 isDone()true인걸로 완료되었다는사실을 알 수 있습니다.
cancel() 메소드로 작업을 취소할 수도 있습니다.
⇒ 인자값으로 현재 진행중인 쓰레드 interrupt 여부를 결정합니다. true면 현재 진행중인 쓰레드를 interrupt하고 그렇지 않으면 현재 진행중인 작업이 끝날때까지 기다립니다.
ExecutorService executorService = Executors.newSingleThreadExecutor(); Callable<String> hello = () ->{ Thread.sleep(2000L); return "Hello"; }; Future<String> helloFuture = executorService.submit(hello); System.out.println(helloFuture.isDone()); System.out.println("Started!"); helloFuture.cancel(false); System.out.println(helloFuture.isDone()); System.out.println("End"); helloFuture.get(); executorService.shutdown();
Java
복사
[실행 결과]
1.
helloFuture.cancel(false)
⇒ 현재 진행중인 작업을 기다린 뒤 작업을 취소합니다. 작업을 취소되어 종료되었기 때문에 밑에 helloFuture.isDone()true가 반환되며, 이미 취소한 작업을 get() 호출하는 시점에는 CancellationException 예외가 발생합니다.
invokeAll() 메소드를 호출해 여러 작업들을 동시에 실행할수도 있습니다.
ExecutorService executorService = Executors.newSingleThreadExecutor(); LocalDateTime start = LocalDateTime.now(); Callable<String> hello = () ->{ Thread.sleep(2000L); return "Hello"; }; Callable<String> java = () ->{ Thread.sleep(3000L); return "java"; }; Callable<String> catsbi = () ->{ Thread.sleep(1000L); return "catsbi"; }; List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, java, catsbi)); for (Future<String> future : futures) { System.out.println(future.get()); } LocalDateTime end = LocalDateTime.now(); Duration between = Duration.between(start, end); System.out.println(between.getSeconds());
Java
복사
[실행 결과]
1.
출력 결과
⇒ 1초,2초, 3초짜리 Callbable 들이 수행되며 차례대로 출력되는게 아닌 전체가 끝날때까지 기다렸다가 한 번에 출력되었습니다. 그 이유는 invokeAll()에서는 어느 하나의 작업이 끝나더라도 다른 작업이 끝날때까지 기다렸다가 모든 작업이 끝나면 결과를 가져오기 때문입니다.
2.
between.getSeconds()
⇒ 1초, 2초, 3초 짜리 작업을 각각 수행하며 끝날때까지 기다렸다가 출력하는데, cuncurrency하다면 가장 긴 작업시간인 3초가 지났을때 모든 작업이 끝나야 합니다. 하지만 출력결과는 6초였습니다. 그 이유는 ExecutorService에 할당된 thread가 싱글쓰레드이기 때문에 내부에서 3개의 작업들이 각각 쓰레드에 할당되어 수행되는게아닌 하나의 쓰레드에 하나씩 작업 할당및 작업을 하며 대기하기 때문입니다.
이를 해결하기 위해서는 newSingleThreadExecutor() 가 아닌 newFixedThreadFool(3)을 통해 3개의 쓰레드를 할당해주면 3초가 출력되는 것을 확인할 수 있습니다
3.
invokeAll
invokeAll() 메소드는 태스크가 모두 끝날때까지 기다렸다가 값들을 반환합니다. 하지만, 가장 먼저 완료된 작업만 반환해도 괜찮다면 invokeAll을 쓰기에는 성능이 떨어집니다.
그럴 때 사용 할 수 있는 메소드가 invokeAny입니다. 이 때 주의할 점은 Executors에 쓰레드가 하나만 할당되어있다면 결국 하나가 끝나야 다음 작업이 수행되는데 이 때 가장 처음 할당되는 작업이 오래걸리는 작업이라면 효과가 없습니다. 그래서 적절히 쓰레드를 할당해줘야 합니다.
String result = executorService.invokeAny(Arrays.asList(hello, java, catsbi)); System.out.println("result = " + result);
Java
복사
[실행 결과]

4. CompletableFuture 1

개요

자바에서 비동기(Asynchronous)프로그래밍을 가능하게하는 인터페이스.
Future의 제약사항들을 해결한다.

Future 제약

예외 처리용 API를 제공하지 않습니다.
여러 Future를 조합할 수 없습니다. (ex: Event정보를 받아 다음 Event에 참석할 회원목록조회)
Future를 외부에서 완료시킬 수 없습니다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.
get()을 호출하기 전까지는 future를 다룰 수 없습니다.
ExecutorService executorService = Executors.newFixedThreadPool(4); Future<String> future = executorService.submit(() -> "hello"); ...//TODO future.get(); ...//TODO
Java
복사
⇒ 여기서 futureblocking call입니다. 이 말은 future를 get()으로 가져오는 동안에는 다른 작업들의 수행이 안된다는 의미이고 그 기간이 길어질수록 성능은 떨어질수밖에 없습니다.

CompletableFuture

Future와 CompletionStage를 구현하는 구현체
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>{...}
CompletableFuture<String> future = new CompletableFuture<>(); future.complete("catsbi"); System.out.println("future = " + future.get()); //-------------OR -------------- CompletableFuture<String> future = CompletableFuture.completedFuture("catsbi"); System.out.println("future = " + future.get());
Java
복사

비동기로 작업 실행하기

리턴값이 없는 경우::runAsync()
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); }); future.get();
Java
복사
리턴값이 있는 경우::supplyAsync()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); return "Hello"; }); future.get();
Java
복사

콜백 제공하기

리턴값을 받아서 다른 값으로 바꾸는 콜백
thenApply(Function)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); return "Hello"; }).thenApply((s)->{ System.out.println("content: "+s); System.out.println(Thread.currentThread().getName()); return "HelloAsync"; }); System.out.println(future.get());
Java
복사
JavascriptPromise 와 유사한 형태입니다. supplyAsync의 람다표현식에서 반환된 Hello라는 값은 체이닝된 메소드 thenApply의 인자값으로 들어가고 사용 가능합니다. 그리고 더 이상 체이닝된 메소드가 없기에 return값인 HelloAsync는 반환되어 future로 들어가고
get()을통해 받을 수 있습니다.
리턴값을 받아 또 다른 작업을 수행하는데 반환값은 없는 콜백
thenAccept(Consumer)
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); return "Hello"; }).thenAccept((s)->{ System.out.println("content: "+s); System.out.println(Thread.currentThread().getName()); }); future.get();
Java
복사
리턴값을 받지 않고 다른 작업을 수행하는 콜백
thenRun(Runnable)
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); return "hello"; }).thenRun(()->{ System.out.println(Thread.currentThread().getName()); }); future.get();
Java
복사

어떻게 thread pool을 만들지 않고 별도의 쓰레드에서 동작하는가

: because of ForkJoinPool
⇒ Dequeue를 사용해 자신의 쓰레드가 할 일이 없을 경우 Dequeue에서 할 일을 가져와서 처리를 하는 방식의 프레임워크
이런 방식으로 CommonPool이라는 곳에서 쓰레드를 가져와 사용합니다. 물론, 여기서 따로 구현한 쓰레드를 할당할 수도 있습니다.
ExecutorService executorService = Executors.newFixedThreadPool(4); CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); return "hello"; }, executorService).thenRunAsync(()->{ System.out.println(Thread.currentThread().getName()); }, executorService); future.get(); executorService.shutdown();
Java
복사
1.
ExecutorService executorService = Executors.newFixedThreadPool(4);
⇒ CompletableFuture에서 사용할 쓰레드풀을 만들어줍니다.
2.
CompletableFuture.supplyAsync(() -> {...}, executorService)
⇒ 파라미터로 별도의 쓰레드 풀을 넣어주면 해당 쓰레드풀을 사용합니다.runAsync에서도 사용 가능합니다.
3.
.thenRunAsync(()->{...}, executorService);
⇒별도의 쓰레드풀을 사용하고싶으면 기존의 콜백메소드 의 접미사(suffix)로 Async를 붙혀서 사용하면 됩니다(ex: thenRunAsync, thenAcceptAsync, thenApplyAsync)

5. CompletableFuture 2

CompletableFuture 조합 메소드

1.
thenCompose() ⇒ 두 작업이 서로 이어서 실행하도록 조합하며 연관된 future간에 많이 사용합니다.
public class App { public static void main(String[] args) throws InterruptedException, ExecutionException { CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }); CompletableFuture<String> future = helloFuture.thenCompose(App::getWorldFuture); System.out.println(future.get()); } private static CompletableFuture<String> getWorldFuture(String message) { return CompletableFuture.supplyAsync(() -> { System.out.println("World " + Thread.currentThread().getName()); return message + " World"; }); } }
Java
복사
[실행 결과]
2.
thenCombine() ⇒ 두 작업을 독립적으로 실행하고 둘 다 종료 했을때 콜백 실행합니다.
public static void main(String[] args) throws InterruptedException, ExecutionException { CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> { System.out.println("MicroSoft " + Thread.currentThread().getName()); return "MicroSoft"; }); CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Apple " + Thread.currentThread().getName()); return "Apple"; }); CompletableFuture<String> resultFuture = msFuture.thenCombine(appleFuture, (i, j) -> i + " " + j); System.out.println(resultFuture.get()); }
Java
복사
3.
allOf()
⇒여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
public static void main(String[] args) throws InterruptedException, ExecutionException { CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> { System.out.println("MicroSoft " + Thread.currentThread().getName()); return "MicroSoft"; }); CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Apple " + Thread.currentThread().getName()); return "Apple"; }); List<CompletableFuture<String>> futures = Arrays.asList(msFuture, appleFuture); CompletableFuture<List<String>> results = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) .thenApply(v -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); results.get().forEach(System.out::println); }
Java
복사
4.
anyOf()
⇒ 여러 작업 중 가장 끝난 하나의 결과를 콜백에 넘겨 실행합니다.
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> { System.out.println("MicroSoft " + Thread.currentThread().getName()); return "MicroSoft"; }); CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Apple " + Thread.currentThread().getName()); return "Apple"; }); CompletableFuture<Void> future = CompletableFuture.anyOf(msFuture, appleFuture).thenAccept(System.out::println); future.get();
Java
복사

예외 처리 메소드

1.
exeptionally(Function)
boolean throwError = true; CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> { if (throwError) { throw new IllegalArgumentException(); } System.out.println("MicroSoft " + Thread.currentThread().getName()); return "MicroSoft"; }).exceptionally(ex->{ System.out.println(ex); return "Error"; }); System.out.println(msFuture.get());
Java
복사
[실행 결과]
2.
handle(BiFunction)
boolean throwError = false; CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> { if (throwError) { throw new IllegalArgumentException(); } System.out.println("MicroSoft " + Thread.currentThread().getName()); return "MicroSoft"; }).handle((result, ex)->{ if (Objects.nonNull(ex)) { System.out.println(ex); return "ERROR"; } return result; }); System.out.println(msFuture.get());
Java
복사