사용자 도구

사이트 도구


java:concurrent:executorservice

Java ExecutorService

최적의 쓰레드 풀 수

Java Concurrency in Practice에 나오는 내용

N쓰레드 = N씨피유 * U씨피유 * (1 + W/C)
  • N씨피유 : Runtime.getRuntime().availableProcessors()가 반환하는 Core 갯수
  • U씨피유 : 0~1 사이의 값을 갖는 CPU 활용 비율
  • W/C는 대기시간(I/O 시간 등)과 계산 시간(CPU가 직접 작동하는 시간)의 비율. I/O가 높으면 100, I/O는 낮고 CPU 활용시간이 많으면 0에 가깝게.
  • 쓰레드 수가 너무 많으면 오히려 서버가 크래시 될 수 있으므로 하나의 Executor에서 사용할 쓰레드의 최대 개수는 100이하로 설정하는게 바람직하다

Basic ExecutorService shutdown Pattern

executorService.shutdown();
try {
    if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
        executorService.shutdownNow();
    } 
} catch (InterruptedException e) {
    executorService.shutdownNow();
}
 

CahcedThreadPool

ExecutorService executorService = Executors.newCachedThreadPool();
  • 이미 쉬고 있는 쓰레드가 있으면 재사용하고 Pool이 꽉차 있으면 새로운 쓰레드를 생성한다.
  • 60초 동안 쉰 쓰레드는 풀에서 제거한다. 따라서 장시간 리소스를 점유하지는 않는다.
  • 그런데 갯수제한은?

ScheduledExecutor

  • Executors.newSingleThreadScheduledExecutor() 사용시에는 스케줄이 여러개라도 하나의 스케줄러 쓰레드가 실행한다.
  • scheduleAtFixedRate 는 태스크 시작시간 기준으로 delay를 적용한다. 앞선 스케줄이 여전히 실행중이면 기다렸다가 종료시점에 곧바로 시작한다.
  • scheduleWithFixedDelay 는 태스크 종료시간 기준으로 delay를 적용한다. 앞선 스케줄이 여전히 실행중이면 기다렸다가 종료시점에 delay 시간만큼 기다리고 실행한다.

ExecutorService vs. Fork/Join

  • Java 7 에 Fork/Join 추가
  • ExecutorService는 쓰레드 수를 조정할 수 있다. 서로 무관한 독립적인 태스크들을 각각의 쓰레드에서 실행할 때 사용.
  • Fork/Join은 하나의 작업을 여러개의 작은 조각으로 recursive하게 쪼개서 수행하여 성능을 높이고자 할 때 사용한다.

ExecutorService 주의점

  • 애플리케이션 종료할 때 올바르게 종료시켜야한다. shtudown() 패턴을 참조한다.
  • 쓰레드 풀 갯수(특히 fixed 일 때)를 너무 적게하면 애플리케이션 성능이 떨어지고 너무 많이하면 불필요한 쓰레드 생성 오버헤드가 증가한다. 갯수 조정에 주의할 것. CachedThreadPool을 사용하는게 낫지 않을까?
  • 취소된 Futureget()을 호출하지 말것.
  • Future.get()에 timeout을 적용하여 너무 오래 blocking 하지 않게 할 것.
  • 웹 애플리케이션 등에서 모든 요청마다 ExecutorService를 생성하지 말라. 요청이 폭주할 때 요청보다 더 많은 쓰레드가 생성된다. 미리 생성해둔 것을 ExecutorService를 재사용해야한다.

ThreadPoolExecutor

  • 대표적인 Thread Pool?
  • Spring @Async가 사용한다.
  • corePoolSize : 기본 Pool Size
  • maximumPoolSize : 최대 Pool Size
  • corePoolSize를 먼저 값을 설정하고 후에 maximumPoolSize를 설정할 것. setMaxPoolSize에서 corePoolSize validation을 수행함.
  • 여기서 매우 중요한 점은 Pool을 늘리는 규칙이다. 이 규칙을 잘 못 이해하면 corePoolSize 만큼의 pool 밖에 안 만들어진다.
A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize()) according to the bounds set by corePoolSize (see getCorePoolSize()) and maximumPoolSize (see getMaximumPoolSize()). When a new task is submitted in method execute(Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize(int) and setMaximumPoolSize(int).
corePoolSize 만큼의 쓰레드가 만들어져 있으면, 그 다음 쓰레드는 queueCapacity만큼 큐에 쌓여 있다가 corePool로 실행이 인입된다. 따라서 corePoolSize가 작고 queueCapacity가 매우 크면 queue가 꽉차기 전까지는 실제로 쓰레드 풀이 확장되지 않고 계속해서 corePool 만 재사용하게 된다.
  • reject : Executor가 shutdown 상태이거나, queue와 maximum size가 정해져 있을 경우 이것이 꽉 차면 execute(Runnable)가 reject 된다. 이 때 RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)가 호출된다.
  • corePoolSize=0, maximumPoolSize=Integer.MAX_VALUE, keepAliveTime=60sec, queueCapacity=0 으로 지정하면 Executors.newCachedThreadPool() 설정.

ExecutorCompletionService

final ExecutorService pool = Executors.newFixedThreadPool(5);
 
// 응답 타입(String) 명시 필요
final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(pool);
 
for (final String site : topSites) {
    completionService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return IOUtils.toString(new URL("http://" + site), StandardCharsets.UTF_8);
        }
    });
}
 
// submit 한 갯수가 정확해야한다.
for(int i = 0; i < topSites.size(); ++i) {
    final Future<String> future = completionService.take(); // 먼저 실행되는 대로 바로 리턴
    try {
        final String content = future.get();
        //...process contents
    } catch (ExecutionException e) {
        log.warn("Error while downloading", e.getCause());
    }
}
java/concurrent/executorservice.txt · 마지막으로 수정됨: 2020/08/07 13:53 저자 kwon37xi