티스토리 뷰

Java

ThreadPoolExecutor

개발자-김씨 2017. 1. 9. 03:33
반응형

ThreadPoolExecutor보다 사용하기 쉬운 고수준API들이 많기 때문에 굳이 ThreadPoolExecutor을 사용할 일은 잘 없지만 

ThreadPoolExecutor을 알아두면  고수준 스레드풀 API를 사용하는데 도움이 되죠 ~

 

일반적으로 ThreadPoolExecutor 생성 시 아래와 같이 선언합니다.

1
new ThreadPoolExecutor(2460, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
cs

위와 같이 생성하면 일단 스레드를 2개 만들어 두고 스레드가 부족하면 최대 4개까지 만들다가 더 이상 작업을 할당할 스레드가 부족하게 되면 큐에 대기작업이 등록될거라고 생각할수 있습니다. 

(저는 그렇게 생각했더라는 ... 큐의 매개변수가 BlockingQueue라서 더 그랬던거 같습니다. 이 이야기는 차츰차츰)

하지만 실제로 돌려보면 예상과 다르게 동작합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class 테스트1 {
    
    public static void main(String[] args) throws Exception {
     final ExecutorService tp = new ThreadPoolExecutor(246, TimeUnit.SECONDS,new LinkedBlockingQueue());
        for (int i = 0; i <  20; i++) {
            //스레드풀에 task 20개 등록
            System.out.println(tp.toString());
            final int j = i;
            tp.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep((j * 20+100);
                        System.out.println(tp.toString());
                    } catch (InterruptedException e) { }
                }  
            });  
        }
     }
}
cs

결과 : 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
[Running, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0]
....
[Running, pool size = 2, active threads = 2, queued tasks = 15, completed tasks = 0]
[Running, pool size = 2, active threads = 2, queued tasks = 16, completed tasks = 0]
[Running, pool size = 2, active threads = 2, queued tasks = 17, completed tasks = 0]
[Running, pool size = 2, active threads = 2, queued tasks = 18, completed tasks = 0] 20개 모두 등록
[Running, pool size = 2, active threads = 2, queued tasks = 17, completed tasks = 1]
[Running, pool size = 2, active threads = 2, queued tasks = 16, completed tasks = 2]
[Running, pool size = 2, active threads = 2, queued tasks = 15, completed tasks = 3]
....
[Running, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 15]
[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 16]
[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 17]
[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 18]
[Running, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 19]
[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 20]
cs
 

최초 pool size가 0에서 작업이 등록되면서 2개까지 증가합니다.

하지만 maximunPoolSize를 4로 설정했지만 종료될 때까지 pool size는 항상 2개 입니다.

왜 pool size는 2개에서 더 이상 증가하지 않는 걸까요?

 

ThreadPoolExecutor의 동작방식 

작업 요청이 들어오면 스레드풀은 스레드 숫자를 corePoolSize까지 생성합니다.

모든 스레드가 처리중이면 이 후 요청작업은 Queue에 등록하여 대기하도록 합니다.

그러다 더 이상 Queue에도 대기작업을 등록할 수 없게 되면 임시 스레드를 추가해서 스레드 숫자를 maximumPoolSize까지 늘이게 됩니다. 큐가 가득차서 상시 스레드보다 많은 스레드가 생생된 상태를 임시풀(상태)이라고 합니다.

반대로 상시 스레드(corePoolSize)만으로 처리되는 풀을 상시풀(상태)이라고 합니다.

임시 스레드는 keepAliveTime 시간동안 더 이상 작업이 할당되지 않으면 반환되지만 

상시 스레드는 한 번 생성되면 계속 유지됩니다.

     * corePoolSize는 상시풀 상태의 스레드풀 최대 크기

     * maximumPoolSize는 임시풀 상태의 스레드풀 최대 크기

    상시 스레드 반환 시간

      allowsCoreThreadTimeOut(boolean) 메소드를 통해서 상시 스레드도 keepAliveTime설정을 적용시킬 수도 있음

 

 

왜 스레드를 바로 늘이지 않고 대기열(BlockingQueue)을 이용할까?

스레드 생성 비용은 매우 크기 때문에 먼저 큐를 이용하도록 하는 것입니다.

고속도로 톨게이트처럼 교통량이 조금 늘어난다고 해서 바로 수납원을 투입하지 않고 일단 줄을 세우는 것과 비슷한거죠.

너무 당연한가요..?

 

 

그러면 정리한 내용대로 동작하는지 확인해 보도록 하겠습니다.

대기열이 가득차야지만 스레드 수가 증가되므로 큐 사이즈를 10, 최대 스레드 숫자를 20으로 하고 20개의 작업을

등록해 보도록 하겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class 테스트2 {
    
    public static void main(String[] args) throws Exception {
     final ExecutorService tp = new ThreadPoolExe(2,206,TimeUnit.SECONDS,new LinkedBlockingQueue(10));
        for (int i = 0; i <  20; i++) {
            //스레드풀에 task 20개 등록
            System.out.println(tp.toString());
            final int j = i;
            tp.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep((j * 20+100);
                        System.out.println(tp.toString());
                    } catch (InterruptedException e) { }
                }  
            });  
        }
 
  Thread.sleep(10000);
        System.out.println(tp.toString());
     }
}
cs
 
 

결과 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
...
[Running, pool size = 2, active threads = 2, queued tasks = 10, completed tasks = 0]
[Running, pool size = 3, active threads = 3, queued tasks = 10, completed tasks = 0]
[Running, pool size = 4, active threads = 4, queued tasks = 10, completed tasks = 0]
...
[Running, pool size = 9, active threads = 9, queued tasks = 10, completed tasks = 0]
[Running, pool size = 10, active threads = 10, queued tasks = 10, completed tasks = 0]
[Running, pool size = 10, active threads = 10, queued tasks = 9, completed tasks = 1]
[Running, pool size = 10, active threads = 10, queued tasks = 8, completed tasks = 2]
...
[Running, pool size = 10, active threads = 10, queued tasks = 3, completed tasks = 7]
[Running, pool size = 10, active threads = 10, queued tasks = 2, completed tasks = 8]
[Running, pool size = 10, active threads = 10, queued tasks = 1, completed tasks = 9]
[Running, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 10]
[Running, pool size = 10, active threads = 9, queued tasks = 0, completed tasks = 11]
[Running, pool size = 10, active threads = 8, queued tasks = 0, completed tasks = 12]
...
[Running, pool size = 10, active threads = 2, queued tasks = 0, completed tasks = 18]
[Running, pool size = 10, active threads = 1, queued tasks = 0, completed tasks = 19]
[Running, pool size = 10, active threads = 0, queued tasks = 0, completed tasks = 20]
..
[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 20]
cs

4행) 상시풀(corePooSize) 2개가 실행중이기 때문에 queue에 등록되기 시작

7행) 13개가 등록될 때부터는 스레드와 queue가 다 찼기 때문에 임시 스레드가 하나씩 추가 된다.

11행) 20개의 작업 요청이 모두 등록되면, 임시 스레드가 8개 추가되어 총 스레드 숫자는 10이 되고 

       큐에도 총 10개의 대기작업이 등록된다.

12행) 작업을 끝낸 스레드가 큐에서 대기작업을 하나씩 꺼내간다.

19행) 더 이상 큐에 대기 작업이 없다면 스레드는 비활성화 된다.

24행) 20개 작업이 모두 완료되었고 스레드 풀의 크기는 10

26행) 10초 후, 임시 스레드가 반환되어 스레드 풀의 크기는 2

 

 

설정 ( maxiumPoolSize, queue size, keepAliveTime  )

- 최대 스레드 수

너무 많은 스레드가 생성되게 되면 리소스 문제로 인해 치명적인 문제가 발생할 수 있기 때문에 최대 스레드 숫자는 조금 보수적으로 설정하는 것이 좋습니다. 

 

- 큐 사이즈

큐의 크기가 너무 크다면 지연이 발생하게 되고 반대로 너무 작다면 스레드가 빈번하게 생성&반환 될 수 있습니다.

큐 사이즈가 너무 커서 지연이 계속 누적되는 것을 인지하지 못하면 치명적인 문제가 생길 수 있기 때문입니다.

(아래에서 자세히..)

 

- 스레드 반환 시간

스레드는 상당히 많은 자원이 드는 작업이기 때문에 가급적이면 적은 스레드 숫자를 유지하고 생성과 반환 또한 빈번하지 않도록 하는 것이 좋습니다.

경우에 따라 상시 스레드 수 = 최대 스레드 수로 설정해서 생성.반환 비용을 줄이는 전략을 선택할 수도 있습니다.

 

 

 

등록 실패 예외와 대응 정책 

대기열도 가득차고, 임시 스레드도 더 이상 생성할 수 없는 상태에서 요청이 등록되면 

ThreadPoolExecutor은 RejectedExecutionException을 던집니다.

그런데 작업 등록이 실패했다고 해서 꼭 Exception를 발생시킬 필요는 없습니다. 

경우에 따라서 무시할 수도 있고 우선 순위를 변경할 수도 있는데 이런 경우를 위해 핸들러를 제공합니다.

 

RejectedExecutionHandler

ThreadPoolExecutor의 setRejectedExecutionHandler메소드를 통해 핸들러를 등록 할 수도 있습니다.

 

기본 제공 RejectExceptionHandler구현체 (ThreadPoolExecutor 내부 클래스로 정의되어 있습니다.)

  - ThreadPoolExecutor.AbortPolicy (기본 정책)

  작업이 거절되면 RejectedExecutionException을 발생시킵니다.

  AbortPolicy가 기본 정책이라 예제-테스트1에서 RejectedExecutionException가 발생했던 것입니다.

  - ThreadPoolExecutor.CallerRunsPolicy

  작업이 거절되면 호출한 스레드, 즉 메인스레드에서 작업이 실행됩니다. 

  - ThreadPoolExecutor.DiscardPolicy

  작업이 거절되면 무시합니다. 아무런 동작을 하지 않습니다.

  - ThreadPoolExecutor.DiscardOldestPolicy

  작업이 거절되면 가장 오래된(가장 처음 등록된) 작업을 제거하고 다시 실행하도록 합니다.

혹은 사용자가 직접 RejectedExecutionHandler을 구현하여 대응정책을 설정할 수도 있습니다. 

 

 

 

 

 

Queue 전략

스레드가 큐에서 작업을 꺼낼때까지 대기하기 때문에 ThreadPoolExecutor의 큐는 BlockingQueue하위 구현체입니다.

( BlockingQueue의 take()메소드나 poll(time, unit)메소드는 블록킹되는 메소드)

 

 

ThreadPoolExecutor Java doc에 보시면 세가지 Queue전략에 대한 설명이 나옵니다. 

1 .용량이 없는 큐(Direct Handoffs)전략 

Queue가 없기 때문에 스레드 풀 사이즈가 최대 스레드 갯 수를 넘어가면 작업이 거절됩니다. (RejectedExecutionException)

이 때는 SynchronousQueue사용을 추천합니다. (SynchronousQueue는 할당을 기다리는 소비자가 없으면 바로 등록 실패)

 

 

2. 무제한 큐 전략

Queue가 무제한이기 때문에 스레드 숫자는 항상 corePoolSize를 넘기지 않습니다. (maximumPoolSize 설정 무시 됨)

시스템 리소스가 일정 수준을 넘지 않으므로 안정성을 확보할 수 있지만, 요청량이 늘어난다면 queue에 대기 작업이 계속 누적될 수 있기 때문에 작업 지연에 대한 대응책이 필요합니다.

Java Doc 무제한 큐 예제로 LinkedBlockingQueue가 나오는데 LinkedBlockingQueue자체가 무제한 큐는 아닙니다.

LinkedBlockingQueue 생성 시, 초기값을 주지 않으면 Integer.MAX_VALUE로 설정됩니다.

 

 

3. 제한적인 큐 전략

Queue가 제한적이기에 대기작업이 많아지면 스레드가 증가합니다. 적절한 maximumPoolSize설정을 통해 시스템 부하 및 작업 지연에 대해 적절한 조치가 가능합니다. 물론 적절한 값을 설정해야합니다.

 

 

** 일반적으로 무제한 큐나 제한큐일 때, ArrayBlockingQueue보다 LinkedBlocking가 선호됩니다.

이유는 Array와 Linked의 차이

  

 

 

지금까지 ThreadPoolExecutor에 대해서 알아보았습니다.

 

 

참고 ) https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html

  

 

 

 

 

반응형

'Java' 카테고리의 다른 글

ClientSocketUtil 개선버전  (0) 2023.04.12
Socket Util 만들어 보기  (0) 2023.04.11
자바 동기화 처리 - volatile 와 synchronized  (1) 2020.11.25
synchronized 와 Double-checked locking  (0) 2020.11.20
Optional에 대해....  (0) 2020.10.22
댓글