1. 자바 Concurrent 프로그래밍 소개
Concurrent 소프트웨어
동시에 여러 작업을 할 수 있는 소프트웨어
•
예) 인터넷 강의를 보면서 notion으로 필기를 하고 그와중에 IntelliJ로 코딩을 할 수 있다.
자바에서 지원하는 Cuncurrent 프로그래밍
•
멀티프로세싱(ProcessBuilder)
•
멀티쓰레드
자바 멀티쓰레드 프로그래밍
•
Thread/Runnable
예제
public class App {
public static void main(String[] args) {
MyThread myThread = new MyThread();
myThread.start();
//Anonymous class
Thread runnable = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Runnable Thread: " + Thread.currentThread().getName());
}
});
//Lambda Expression
Thread lambdaThread = new Thread(() -> System.out.println("Lambda Thread:" + Thread.currentThread().getName()));
System.out.println("Hello: "+Thread.currentThread().getName());
}
//Thread 상속
static class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread: "+Thread.currentThread().getName());
}
}
}
Java
복사
[실행 결과]
Hello: main
Thread: Thread-0
⇒코드의 실행 순서만 봐서는 Thread가 먼저 출력되야 할 것 같지만, 실제로 실행해보면 다르게 출력될 때도 있습니다. 이를 통해 Thread는 순서를 보장하지 않는다는 것을 알 수 있습니다.
여기선 로컬클래스를 이용했지만, 익명클래스와 람다표현식을 이용해서도 적용할 수 있습니다.
주요기능(method)
•
sleep(mills)
⇒ 현재 쓰레드 재우기(멈춰두기): 스레드를 대기상태로 멈춰서 다른 스레드가 처리할 수 있도록 합니다. 하지만 락을 놔주진 않기에 자칫하면 데드락 상태에 걸릴 수 있습니다.
public static void main(String[] args) throws InterruptedException {
//Lambda Expression
Thread lambdaThread = new Thread(() -> {
try{
Thread.sleep(1000L);
}catch(InterruptedException e){
System.out.println("interrupted!");
return;
}
System.out.println("Thread: "+Thread.currentThread().getName());
});
lambdaThread.start();
System.out.println("Hello: "+Thread.currentThread().getName());
}
Java
복사
1.
Thread.sleep(1000L)
⇒Thread를 start하면 1초(1000L)동안 멈춰있고 그 동안 다른 쓰레드를 수행하기 때문에 Hello가 항상 우선 출력됩니다.
•
interrupt()
⇒ 다른 쓰레드를 깨워 InterruptException 을 발생시킵니다. 이 에러에 대한 핸들링은 코딩을 통해 구현할 수 있습니다.
public static void main(String[] args) throws InterruptedException {
//Lambda Expression
Thread lambdaThread = new Thread(() -> {
try{
Thread.sleep(3000L);
}catch(InterruptedException e){
System.out.println("interrupted!");
return;
}
System.out.println("Thread: "+Thread.currentThread().getName());
});
lambdaThread.start();
lambdaThread.interrupt();
System.out.println("Hello: "+Thread.currentThread().getName());
}
Java
복사
1.
lambdaThread.interrupt();
⇒ lambdaThread에 interrupt()메소드를 호출해 lambdaThread내에 InterruptedException 을 발생시킵니다.
•
join()
⇒ 다른 쓰레드가 끝날 때까지 기다립니다.
public static void main(String[] args) throws InterruptedException {
//Lambda Expression
Thread lambdaThread = new Thread(() -> {
try{
Thread.sleep(3000L);
}catch(InterruptedException e){
System.out.println("interrupted!");
return;
}
System.out.println("Thread: "+Thread.currentThread().getName());
});
lambdaThread.start();
lambdaThread.join();
System.out.println("Hello: "+Thread.currentThread().getName());
}
Java
복사
1.
lambdaThread.join();
⇒ lambdaThread에 join()메소드를 호출하여 lambdaThread가 종료될 때까지 기다립니다.
2. Executors
: Runnable이나 Thread와 같은 Low-level이아닌 고 수준(High-Level) Concurrency 프로그래밍
우리가 Runnable만 정의해서 제공해주면 스레드를 만들고, 불필요해지면 종료하고 관리해주는 작업들을 대신 해주는 클래스
Executors가 하는 일
•
쓰레드 만들기: 애플리케이션이 사용할 쓰레드 풀을 만들어관리한다.
•
쓰레드 관리: 쓰레드 생명 주기를 관리한다.
•
작업 처리 및 실행: 쓰레드로 실행할 작업을 제공할 수 있는 API를 제공한다.
주요 인터페이스
•
Executor: execute(Runnable)
•
ExecutorService: Executor를 상속 받은 인터페이스로, Callable도 실행 가능하며 Executor를 종료 시키거나 여러 Callable을 동시에 실행하는 등의 기능을 제공한다.
•
ScheduledExecutorService: ExecutorService를 상속 받은 인터페이스로 특정 시간 이후에 또는 주기적으로 작업 실행할 수 있다.
예제
•
기본 사용 예제
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
//Legacy case
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("Thread: "+Thread.currentThread().getName());
}
});
//Lambda Expression
executorService.submit(()->{
System.out.println("Lambda Expression Thread: "+Thread.currentThread().getName());
});
executorService.shutdown(); //graceful shutdown
//executorService.shutdownNow(); //즉시 종료
}
Java
복사
•
2개의 Thread를 이용하여 실행
public class App {
private static Runnable getRunnable(String message) {
return ()->System.out.println(message + Thread.currentThread().getName());
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(getRunnable("Hello"));
executorService.submit(getRunnable("World"));
executorService.submit(getRunnable("The"));
executorService.submit(getRunnable("Java"));
executorService.submit(getRunnable("Lecture"));
executorService.submit(getRunnable("Concurrent"));
executorService.submit(getRunnable("Part"));
executorService.shutdown(); //graceful shutdown
}
}
Java
복사
[실행 결과]
1.
Executors.newFixedThreadPool(2)
⇒ 해당 메소드를 호출하면 해당 영역에는 인자값으로 넘겨준 숫자만큼 Thread를 관리합니다. 위 코드에서는 2를 인자값으로 넘겨줬기 때문에 2개의 2개의 쓰레드를 관리하는 Thread Pool이 도는동안 Blocking Queue에 등록된 작업들이 차례대로 동작합니다.
•
ExecutorService.newSingleThreadScheduledExcutor();
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(getRunnable("hello"), 3, 1, TimeUnit.SECONDS);
Java
복사
1.
scheduleAtFixedRate(실행 Runnable, 시작 지연 시간, 딜레이, 파라미터 시간 단위)
⇒ 위 코드에서는 Runnable타입을 반환하는 getRunnable() 메소드를 프로그램이 시작 후 3초 뒤부터 1초마다 수행하는 코드입니다.
3. Callable과 Future
Callable
•
Runnable과 거의 유사하지만 반환값을 가질 수 있다.
•
get() 메소드를 통해 값을 반환받을 수 있습니다.
◦
해당 메소드는 블록킹 콜이기에 메소드 호출시점부터 코드실행 완료까지 기다립니다.
◦
타임아웃을 설정할 수 있습니다.
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
Thread.sleep(2000L);
return "Hello";
};
Future<String> submit = executorService.submit(hello);
System.out.println("Started!");
submit.get();//blocking
System.out.println("End");
executorService.shutdown();
Java
복사
[실행 결과]
•
isDone()메소드를 통해 작업 상태를 확인할 수 있습니다.
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
Thread.sleep(2000L);
return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println(helloFuture.isDone());
System.out.println("Started!");
helloFuture.get();//blocking
System.out.println(helloFuture.isDone());
System.out.println("End");
executorService.shutdown();
Java
복사
[실행 결과]
get()을 호출하기전에는 isDone()이 false로 아직 수행되지 않은걸 알 수 있고, get()호출 이후 시점에서는 isDone()이 true인걸로 완료되었다는사실을 알 수 있습니다.
•
cancel() 메소드로 작업을 취소할 수도 있습니다.
⇒ 인자값으로 현재 진행중인 쓰레드 interrupt 여부를 결정합니다. true면 현재 진행중인 쓰레드를 interrupt하고 그렇지 않으면 현재 진행중인 작업이 끝날때까지 기다립니다.
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
Thread.sleep(2000L);
return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println(helloFuture.isDone());
System.out.println("Started!");
helloFuture.cancel(false);
System.out.println(helloFuture.isDone());
System.out.println("End");
helloFuture.get();
executorService.shutdown();
Java
복사
[실행 결과]
1.
helloFuture.cancel(false)
⇒ 현재 진행중인 작업을 기다린 뒤 작업을 취소합니다. 작업을 취소되어 종료되었기 때문에 밑에 helloFuture.isDone()은 true가 반환되며, 이미 취소한 작업을 get() 호출하는 시점에는 CancellationException 예외가 발생합니다.
•
invokeAll() 메소드를 호출해 여러 작업들을 동시에 실행할수도 있습니다.
ExecutorService executorService = Executors.newSingleThreadExecutor();
LocalDateTime start = LocalDateTime.now();
Callable<String> hello = () ->{
Thread.sleep(2000L);
return "Hello";
};
Callable<String> java = () ->{
Thread.sleep(3000L);
return "java";
};
Callable<String> catsbi = () ->{
Thread.sleep(1000L);
return "catsbi";
};
List<Future<String>> futures =
executorService.invokeAll(Arrays.asList(hello, java, catsbi));
for (Future<String> future : futures) {
System.out.println(future.get());
}
LocalDateTime end = LocalDateTime.now();
Duration between = Duration.between(start, end);
System.out.println(between.getSeconds());
Java
복사
[실행 결과]
1.
출력 결과
⇒ 1초,2초, 3초짜리 Callbable 들이 수행되며 차례대로 출력되는게 아닌 전체가 끝날때까지 기다렸다가 한 번에 출력되었습니다. 그 이유는 invokeAll()에서는 어느 하나의 작업이 끝나더라도 다른 작업이 끝날때까지 기다렸다가 모든 작업이 끝나면 결과를 가져오기 때문입니다.
2.
between.getSeconds()
⇒ 1초, 2초, 3초 짜리 작업을 각각 수행하며 끝날때까지 기다렸다가 출력하는데, cuncurrency하다면 가장 긴 작업시간인 3초가 지났을때 모든 작업이 끝나야 합니다. 하지만 출력결과는 6초였습니다. 그 이유는 ExecutorService에 할당된 thread가 싱글쓰레드이기 때문에 내부에서 3개의 작업들이 각각 쓰레드에 할당되어 수행되는게아닌 하나의 쓰레드에 하나씩 작업 할당및 작업을 하며 대기하기 때문입니다.
이를 해결하기 위해서는 newSingleThreadExecutor() 가 아닌 newFixedThreadFool(3)을 통해 3개의 쓰레드를 할당해주면 3초가 출력되는 것을 확인할 수 있습니다
3.
invokeAll
⇒ invokeAll() 메소드는 태스크가 모두 끝날때까지 기다렸다가 값들을 반환합니다. 하지만, 가장 먼저 완료된 작업만 반환해도 괜찮다면 invokeAll을 쓰기에는 성능이 떨어집니다.
그럴 때 사용 할 수 있는 메소드가 invokeAny입니다. 이 때 주의할 점은 Executors에 쓰레드가 하나만 할당되어있다면 결국 하나가 끝나야 다음 작업이 수행되는데 이 때 가장 처음 할당되는 작업이 오래걸리는 작업이라면 효과가 없습니다. 그래서 적절히 쓰레드를 할당해줘야 합니다.
String result = executorService.invokeAny(Arrays.asList(hello, java, catsbi));
System.out.println("result = " + result);
Java
복사
[실행 결과]
4. CompletableFuture 1
개요
자바에서 비동기(Asynchronous)프로그래밍을 가능하게하는 인터페이스.
Future의 제약사항들을 해결한다.
Future 제약
•
예외 처리용 API를 제공하지 않습니다.
•
여러 Future를 조합할 수 없습니다. (ex: Event정보를 받아 다음 Event에 참석할 회원목록조회)
•
Future를 외부에서 완료시킬 수 없습니다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.
•
get()을 호출하기 전까지는 future를 다룰 수 없습니다.
ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<String> future = executorService.submit(() -> "hello");
...//TODO
future.get();
...//TODO
Java
복사
⇒ 여기서 future는 blocking call입니다. 이 말은 future를 get()으로 가져오는 동안에는 다른 작업들의 수행이 안된다는 의미이고 그 기간이 길어질수록 성능은 떨어질수밖에 없습니다.
CompletableFuture
•
Future와 CompletionStage를 구현하는 구현체
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>{...}
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("catsbi");
System.out.println("future = " + future.get());
//-------------OR --------------
CompletableFuture<String> future = CompletableFuture.completedFuture("catsbi");
System.out.println("future = " + future.get());
Java
복사
비동기로 작업 실행하기
•
리턴값이 없는 경우::runAsync()
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Hello" + Thread.currentThread().getName());
});
future.get();
Java
복사
•
리턴값이 있는 경우::supplyAsync()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello" + Thread.currentThread().getName());
return "Hello";
});
future.get();
Java
복사
콜백 제공하기
•
리턴값을 받아서 다른 값으로 바꾸는 콜백
thenApply(Function)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello" + Thread.currentThread().getName());
return "Hello";
}).thenApply((s)->{
System.out.println("content: "+s);
System.out.println(Thread.currentThread().getName());
return "HelloAsync";
});
System.out.println(future.get());
Java
복사
⇒ Javascript의 Promise 와 유사한 형태입니다. supplyAsync의 람다표현식에서 반환된 Hello라는 값은 체이닝된 메소드 thenApply의 인자값으로 들어가고 사용 가능합니다. 그리고 더 이상 체이닝된 메소드가 없기에 return값인 HelloAsync는 반환되어 future로 들어가고
get()을통해 받을 수 있습니다.
•
리턴값을 받아 또 다른 작업을 수행하는데 반환값은 없는 콜백
thenAccept(Consumer)
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello" + Thread.currentThread().getName());
return "Hello";
}).thenAccept((s)->{
System.out.println("content: "+s);
System.out.println(Thread.currentThread().getName());
});
future.get();
Java
복사
•
리턴값을 받지 않고 다른 작업을 수행하는 콜백
thenRun(Runnable)
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello" + Thread.currentThread().getName());
return "hello";
}).thenRun(()->{
System.out.println(Thread.currentThread().getName());
});
future.get();
Java
복사
어떻게 thread pool을 만들지 않고 별도의 쓰레드에서 동작하는가
: because of ForkJoinPool
⇒ Dequeue를 사용해 자신의 쓰레드가 할 일이 없을 경우 Dequeue에서 할 일을 가져와서 처리를 하는 방식의 프레임워크
이런 방식으로 CommonPool이라는 곳에서 쓰레드를 가져와 사용합니다. 물론, 여기서 따로 구현한 쓰레드를 할당할 수도 있습니다.
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello" + Thread.currentThread().getName());
return "hello";
}, executorService).thenRunAsync(()->{
System.out.println(Thread.currentThread().getName());
}, executorService);
future.get();
executorService.shutdown();
Java
복사
1.
ExecutorService executorService = Executors.newFixedThreadPool(4);
⇒ CompletableFuture에서 사용할 쓰레드풀을 만들어줍니다.
2.
CompletableFuture.supplyAsync(() -> {...}, executorService)
⇒ 파라미터로 별도의 쓰레드 풀을 넣어주면 해당 쓰레드풀을 사용합니다.runAsync에서도 사용 가능합니다.
3.
.thenRunAsync(()->{...}, executorService);
⇒별도의 쓰레드풀을 사용하고싶으면 기존의 콜백메소드 의 접미사(suffix)로 Async를 붙혀서 사용하면 됩니다(ex: thenRunAsync, thenAcceptAsync, thenApplyAsync)
5. CompletableFuture 2
CompletableFuture 조합 메소드
1.
thenCompose()
⇒ 두 작업이 서로 이어서 실행하도록 조합하며 연관된 future간에 많이 사용합니다.
public class App {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> future = helloFuture.thenCompose(App::getWorldFuture);
System.out.println(future.get());
}
private static CompletableFuture<String> getWorldFuture(String message) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("World " + Thread.currentThread().getName());
return message + " World";
});
}
}
Java
복사
[실행 결과]
2.
thenCombine()
⇒ 두 작업을 독립적으로 실행하고 둘 다 종료 했을때 콜백 실행합니다.
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("MicroSoft " + Thread.currentThread().getName());
return "MicroSoft";
});
CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Apple " + Thread.currentThread().getName());
return "Apple";
});
CompletableFuture<String> resultFuture =
msFuture.thenCombine(appleFuture, (i, j) -> i + " " + j);
System.out.println(resultFuture.get());
}
Java
복사
3.
allOf()
⇒여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("MicroSoft " + Thread.currentThread().getName());
return "MicroSoft";
});
CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Apple " + Thread.currentThread().getName());
return "Apple";
});
List<CompletableFuture<String>> futures = Arrays.asList(msFuture, appleFuture);
CompletableFuture<List<String>> results =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
results.get().forEach(System.out::println);
}
Java
복사
4.
anyOf()
⇒ 여러 작업 중 가장 끝난 하나의 결과를 콜백에 넘겨 실행합니다.
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("MicroSoft " + Thread.currentThread().getName());
return "MicroSoft";
});
CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Apple " + Thread.currentThread().getName());
return "Apple";
});
CompletableFuture<Void> future = CompletableFuture.anyOf(msFuture, appleFuture).thenAccept(System.out::println);
future.get();
Java
복사
예외 처리 메소드
1.
exeptionally(Function)
boolean throwError = true;
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
if (throwError) {
throw new IllegalArgumentException();
}
System.out.println("MicroSoft " + Thread.currentThread().getName());
return "MicroSoft";
}).exceptionally(ex->{
System.out.println(ex);
return "Error";
});
System.out.println(msFuture.get());
Java
복사
[실행 결과]
2.
handle(BiFunction)
boolean throwError = false;
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
if (throwError) {
throw new IllegalArgumentException();
}
System.out.println("MicroSoft " + Thread.currentThread().getName());
return "MicroSoft";
}).handle((result, ex)->{
if (Objects.nonNull(ex)) {
System.out.println(ex);
return "ERROR";
}
return result;
});
System.out.println(msFuture.get());
Java
복사