WEB/JAVA

내가 보려고 정리하는 JAVA 멀티 스레드 5- 스레드풀

스레드풀은 작업 처리에 사용되는 스레드를 제한된 개수만큼 정해 놓고 작업 큐에 들어오는 작업들을 하나씩 스레드가 맡아 처리한다.

병렬 작업 처리가 많아지면 스레드 개수가 증가되는데 그에 따른 스레드 생성, 스케줄링으로 인해 CPU가 바빠져 메모리 사용량이 늘어난다.

따라서 애플리케이션의 성능이 저화 되므로 스레드 풀을 사용하여 스레드 전체 개수가 늘어나지 않도록 해야한다.

 

스레드풀 생성

public class ExecutorExample {
  public static void main(String[] args) {
      // 1개의 스레드를 사용하는 스레드풀 생성
      ExecutorService singleThread = Executors.newSingleThreadExecutor();
      // 스레드를 제한 없이 사용하는 스레드풀 생성
      ExecutorService cachedThread = Executors.newCachedThreadPool();
      // 3개의 스레드를 사용하는 스레드풀 생성
      ExecutorService fixedThread = Executors.newFixedThreadPool(3);
      // CPU 코어의 수만큼 최대 스레드를 사용하는 스레드풀 생성
      ExecutorService maxFixedThread = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  }
}

스레드풀 종료

스레드풀의 스레드는 기본적으로 데몬 스레드가 아니기 때문에 mian 스레드가 종료되어도 작업을 처리하기 위해 계속 실행 상태로 남아있다.

// 남아있는 작업을 마무리하고 종료한다.
executorService.shutdown();
// 남아있는 작업과 상관없이 강제로 종료한다.
executorService.shutdownNow();

스레드 풀 작업 생성

하나의 작업은 Runnable , Callable 구현 클래스로 표현한다. 둘의 차이점은

Runnable -> 리턴값이 존재하지 않음.

Runnable task = new Runnable(){
	@Override
    public void run(){
    	// 작업 내용
	}
}

Callable -> 리턴값이 존재함

Callable<T> task = new Callable<T>(){
	@Override
    public T call() throws Exceoption{
    	//작업 내용
	return T;
	}
}

 

작업 처리 요청

ExecutorService의 작업 큐에 Runnable 또는 Callable 객체를 넣는 행위다.

execute()는 작업 처리 결과를 받지 못하고 작업 처리 도중 예외가 발생하면 스레드가 종료된다.

submit() 은 작업 처리 결과를 받을 수 있도록 Future를 리턴하고 작업 처리 도중 예외가 발생하더라도 스레드는 종료되지 않고 다음 작업을 위해 재사용 한다. 

 

-> 가급적 생성 오버헤더를 줄이기 위해 submit()을 사용하는 것이 좋다.

 

execute() 를 사용했을 때

스레드의 개수는 변함이 없고 실행 스레드의 이름이 모두 다른것을 확인할 수 있다.

이것은 작업 처리 도중 예외가 발생했기 때문에 해당 스레드는 제거되고 새 스레드가 계속 생성되기 때문이다.

      for (int i=0;i<10;i++){
          Runnable runnable = new Runnable() {
              @Override
              public void run() {
                  ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedThread;
                  int poolSize = threadPoolExecutor.getPoolSize();
                  String threadName = Thread.currentThread().getName();
                  System.out.println("총 스레드 개수 : "+poolSize+"작업 스레드 이름 :" + threadName);

                  //예외 발생 시킴
                  int value = Integer.parseInt("삼");
              }
          };
          // 작업 처리 요청
          fixedThread.execute(runnable);

 

submit() 을 사용했을 때

아래와 같이 스레드 이름이 늘어나지 않는것을 보면  스레드가 종료되지 않고 재사용 되는 것을 볼 수 있다.

 

 

블로킹 방식의 작업 완료 통보

submit() 메서드는 매개값으로 준 Runnable 이나 Callable 작업을 스레드 풀의 작업 큐에 저장하고 즉시 Future객체를 리턴한다.

Future 객채는 작업 결과가 아니라 작업이 완료될 때까지 기다렸다가(블로킹) 최종 결과를 얻는데 사용된다. (지연 완료 객채)

 

Futrue의 get() 메서드를 사용하면 작업이 완료될 때까지 블로킹되었다가 처리결과 V를 리턴한다.(Future<V>)

get(long timeout,TimeUnit unit) 을 사용하면 시간 전에 작업이 완료되면 결과 V를 리턴하지만 완료되지 않으면 Exception을 발생시킨다.

 

submit(Runnable task) future.get() -> null 리턴

submit(Runnable task, Integer result) future.get() -> int 반환

submit(Callable<String> task) future.get() -> String 반환

 

위와 같이 블로킹 방식의 작업 완료 통보에서 주의할 점은 작업을 처리하는 스레드가 작업을 완료하기 까지는

get() 메서드가 블로킹 되므로 다른 코드를 실행할 수 없다.

그렇기 때문에 아래와 같이 get() 메서드를 호출하는 스레드는 새로운 스레드이거나 스레드풀 안에 있는 다른 스레드가 되어야 한다.

 

// 새로운 스레드를 생성해서 호출

new Thread (new Runnalbe(){
	@Overrid
	public void run(){
    	try{
		future.get();
            }catch (Exception e){
		e.printStackTrace();
            }
	}
 }
 }).start();
 
 // 스레드풀의 스레드가 호출
 
 executorService.submit(new Runnable(){
	@Override
	public void run(){
    	try{
		future.get();
            }catch (Excetion e){
		e.printStackTrace();
            }
	}
 });

 

Future 객체는 작업 결과를 얻기 위한 get() 메서드 이외에도 

cancel : 작업시작전 매개값과 상관없이 취소 후 true /작업이 진행 중일 경우 매개값이 true 일 경우에만 작업 스레드를 interrupt

isCancelled : 작업이 완료되었거나 어떤 이유로 인해 취소할 수 없다면 false 

isDone : 작업이 정상적,예외,취소 등 완료되었다면 true

 

1. 리턴값이 없는 작업 완료 통보

submit 메서드를 사용하면 된다.

결과값이 없음에도 불구하고 Future객체를 리턴하는데, 이것은 스레드가 작업 처리를 정상적으로 완료했는지 예외가 발생했는지 확인하기 위해서이다.

Future future = executorService.submit(task);


// null을 리턴하지만 interrupt되거나 예외가 발생할 경우를 위해 예외 처리 코드가 필요하다.

try{
	future.get(); // null
	}catch(InterruptedException e){
    	// 작업 처리 도중 스레드가 interrupt 될 경우 실행할 코드
	}catch(ExecutionException e){
    	// 작업 처리 도중 예외가 발생된 경우 실행할 코드
    }

 

2. 리턴값이 있는 작업 완료 통보

스레드가 작업 후 처리 결과를 얻어야 한다면 Callable 객체를 생성하면 된다.

// Callable 객체 생성
Callable<T> task = new Callable<T>() {
	@Override
	public T call() throws Exception{
    	//스레드가 처리할 작업 내용
        return T;
	}
};

// submit 메서드를 사용해 Future<T>를 리턴
Future<T> future = executorService.submit(task);

// 예외처리
try{
	T result = future.get();
    }catch(InterruptedException e){
	// 작업 처리 도중 스레드가 interrupt 될 경우
    }catch(ExecutionException e){
    // 작업 처리 도중 예외

 

3. 작업 처리 결과를 외부 객체에 저장

외부 Result 객체에 작업 결과를 저장하여 애플리케이션이 객체를 사용해 어떤 작업을 진행할 수 있게 해준다.

대게 Result 객체는 공유 객체가 되어, 두 개 이상의 스레드 작업을 취합할 목적으로 사용된다.

 

아래와 같이 공유 객체가 되어 사용된다.

// 공유 객체 생성
Result result = new Result();
// 작업 객체 생성
Runnable task = new Task(result);
// 작업 처리 요청
Future<Result> future = excutorServie.submit(task,result);

// 결과 받기 or 예외 처리
try{
    result = future.get();
    }catch (Excetion e){
    e.printStackTrace();
    }


// 공유 객체
class Result {
	int accumValue;
    synchronized void addValue(int value){
    accumValue += value;
    }
}

 

4. 작업 완료 순으로 통보

작업의 양과 스레드 스케줄링에 따라서 먼저 요청한 작업이 나중에 완료되는 경우도 발생한다.

순차적일 필요가 없을 경우 처리가 완료된 것부터 결과를 얻어 사용할 수 있다.

CompletionService 의 poll() take() 를 사용하면 된다.

 

poll() -> 완료된 작업이 없다면 즉시 null return

take() -> 완료된 작업이 없다면 있을 때까지 블로킹됨

 

ExcutorService executorService = Excutors.newFixedThreadPool(
	Runtime.getRuntime().availableProcessors());

// 객체를 생성할 때 생성자 매개값으로 ExecutorService를 제공
CompletionService<V> completionService = new ExecutorCompletionService<V>{
	executorService);

// submit() 메서드로 작업 처리 요청
// 스레드풀에게 작업 처리 요청
completionService.submit(Callable<T> task);
completionService.submit(Runable task, V result);


// while문이 있을 경우 애플리케이션이 종료될 때까지 반복 실행해야 하므로 스레드풀의 스레드에서 실행하는 것이 좋다.
executorService.submit(new Runnable(){
	@Override
    public void run(){
    	while(true){
        	try{
            	// 완료된 직업 가져오기
            	Future<Integer> future = completionService.task();
                int value = future.get();
                } catch (Exciption e) {
                break;
                }

 

5. 콜백 방식의 작업 완료 통보

스레드가 작업을 완료하면 특정 메서드를 자동 실행하는 기법이다. 이때 자동 실행되는 메서드를 콜백 메서드라고 한다.

 

블로킹 방식과 콜백 방식의 차이점은 무엇일까?

블로킹 방식은 작업 처리를 요청한 후 작업이 완료될 때까지 블로킹되지만

콜백 방식은 작업 처리를 요청한 후 결과를 기다릴 필요 없이 다른 기능을 수행할 수 있다.  -> 작업 처리가 완료되면 자동으로 콜백 메서드가 실행되어 결과를 알 수 있기 때문

 

ExecutorServicesms 은 콜백을 위해 별도의 기능을 제공하지 않지만 Runnable 구현 클래스를 작성할 때 콜백 기능을 구현할 수 있다.

직접 정의해도 좋고 CompletionHandler를 이용해도 좋다.

 

CompletionHandler<V,A> callback = new CompletionHandler<V,A>(){
	@Override
    public void completed(V result,A attachment){
    // 작업을 정상 처리 완료했을 떄 호출되는 콜백 메서드
    }
    @Override
    public void failed(Throwable exc, A attachemnt){
    // 작업 처리 도중 예외가 발생했을 떄 호출되는 콜백 메서드
    };
}

Runnable task = new Runnable(){
	@Override
    public void run(){
    	try{
		// 작업처리
		V result = ...;
		// 작업을 정상 처리했을 경우 호출
		callback.completed(result,null);
           }catch(Exception e){
		// 예외가 발생 했을 경우 호출
		callback.failed(e,null);
           }
 	}
};

여기서 V타입 파라미터는 결과값의 타입이고 A는 첨부값(콜백 메서드에 결과값 이외 추가적으로 전달 하는 객체)의 타입이다. 

만약 첨부값이 필요 없다면 Void로 지정해주면 된다.

 

(예제)

import javax.annotation.processing.Completion;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CallbackExample {

    // 스레드풀 선언
    private ExecutorService executorService;

    // 스레드 풀에 스레드 갯수 초기화
    public CallbackExample(){
        executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    }

    // V - Integer A - Void 으로 콜백메서드를 가진 객체 생성
    private CompletionHandler<Integer,Void> callback =
            new CompletionHandler<Integer, Void>() {
                @Override
                public void completed(Integer result, Void attachment) {
                    // 작업이 정상 처리 되었을 때 호출
                    System.out.println("completed() 실행 : " +result);
                }

                @Override
                public void failed(Throwable exc, Void attachment) {
                    // 예외가 발생 했을 경우 호출
                    System.out.println("failed() 실행 : " + exc.toString());
                }
            };

    public void doWork(final String x, final String y){
        Runnable task = new Runnable() {
            @Override
            public void run() {
                try{
                    int intX = Integer.parseInt(x);
                    int intY = Integer.parseInt(y);
                    int result = intX+intY;
                    // 정상처리 콜백함수 호출
                    callback.completed(result,null);
                }catch (NumberFormatException e){
                    // 예외 콜백함수 호출
                    callback.failed(e,null);
                }
            }
        };
        // 스레드풀에게 작업 요청
        executorService.submit(task);

    }
    public void finish(){
        executorService.shutdown();
    }


  public static void main(String[] args) {
    CallbackExample example = new CallbackExample();
    example.doWork("3","3");
    // 에러 발생
    example.doWork("3","삼");
    example.finish();
  }
}