티스토리 뷰

개요

  • Spring Boot 기반의 웹 애플리케이션은 기본적으로 각 요청에 대해 동기 방식으로 로직을 실행한다. 하지만, 필요에 따라 비동기 방식으로 로직을 실행해야 하는 경우도 있다. 예를 들면 I/O 부하가 많이 발생하는 배치 작업을 싱글 쓰레드로 처리하면 CPU와 메모리는 남아도는데, 병목이 생겨 느려지기 쉽상이다. 이런 경우 적절히 n개의 멀티 쓰레드로 쪼개어 배치 작업을 실행하면 처리 시간을 경우에 따라 드라마틱하게 단축할 수 있다. 이번 글에서는 이러한 사례를 해결하기 위한 비동기 방식의 실행 방법에 대해서 설명하고자 한다.

RejectedExecutionHandler 구현체 작성

  • ThreadTaskExecutor에 의해 관리되는 쓰레드 풀의 가용 쓰레드가 한계치에 도달할 경우, TaskRejectedException 예외가 발생한다. 이 점을 간과하면 운영 레벨에서 의도하지 않은 장애를 경험하게 된다. RejectedExecutionHandler 인터페이스의 구현체를 작성하여 ThreadPoolExecutor 생성시 명시하면 해당 예외 발생 시점의 처리 로직을 작성할 수 있다.
import java.util.concurrent.RejectedExecutionHandler
import java.util.concurrent.ThreadPoolExecutor

class AsyncRejectedExecutionHandler : RejectedExecutionHandler {

    override fun rejectedExecution(r: Runnable?, executor: ThreadPoolExecutor?) {
        // 가용 쓰레드가 없을 경우 처리 로직 작성
        // 아무 것도 작성하지 않으면 TaskRejectedException 예외를 무시
    }
}

TaskDecorator 구현체 작성

  • org.slf4j.MDC 클래스는 ThreadLocal에 의해 현재 쓰레드의 로그 정보를 필드 단위로 저장할 수 있게 해준다. 일반적인 단일 쓰레드 로직에서는 편리하게 사용하지만 쓰레드 풀의 가용 쓰레드 전환될 경우, 역시 의도하지 않게 로그 필드가 유실되거나 엉뚱한 과거의 로그 필드가 쌓이게 된다. TaskDecorator 인터페이의 구현체를 아래와 같이 작성하면 현재 쓰레드를 호출한 부모 쓰레드의 로그 필드 정보를 그대로 이어 받을 수 있다. (보다 자세한 내용은 본 블로그의 이 글을 참고한다.)
import org.slf4j.MDC
import org.springframework.core.task.TaskDecorator

class LoggingTaskDecorator : TaskDecorator {

    override fun decorate(task: Runnable): Runnable {

        val callerThreadContext = MDC.getCopyOfContextMap()

        return Runnable {
            callerThreadContext?.let {
                MDC.setContextMap(it)
            }
            task.run()
        }
    }
}

@Configuration 클래스 작성

  • 제일 먼저, 비동기 로직 실행을 담당할 쓰레드 풀을 관리하는 TaskExecutor 빈을 등록하고 Spring Boot의 비동기 실행이 몇시된 메써드의 인식을 활성화한다.
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.task.TaskExecutor
import org.springframework.scheduling.annotation.EnableAsync
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

@Configuration
@EnableAsync
class AsyncConfig {

    @Bean
    fun taskExecutor(): TaskExecutor {

        val taskExecutor = ThreadPoolTaskExecutor()
        taskExecutor.corePoolSize = 10
        taskExecutor.setQueueCapacity(50)
        taskExecutor.maxPoolSize = 30
        taskExecutor.setTaskDecorator(LoggingTaskDecorator())
        taskExecutor.setRejectedExecutionHandler(AsyncRejectedExecutionHandler())

        return taskExecutor
    }
}
  • @EnableAsync를 명시함으로서 비동기로 작동할 @Async가 명시된 빈을 인식하도록 한다.
  • 위 예는 쓰레드 풀 생성시 최소 가용 쓰레드로 10개를 생성한다. 만약 10개 쓰레드가 모두 사용 중이라면 큐에 50개까지 대기할 수 있다. 대기 중인 큐마저 모두 차버리면 가용 쓰레드를 최대 30개까지 생성한다. 최대 가용 쓰레드까지 차버리면 TaskRejectedException 예외가 발생하는데 앞서 작성한 RejectedExecutionHandler 구현체를 지정하여 해당 이슈에 대응할 수 있다.

AsyncService 작성

  • 이제 쓰레드 풀의 가용 쓰레드에 의해 실행될 비동기 서비스 메써드를 작성할 차례이다.
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Service
import java.util.concurrent.CompletableFuture

@Service
class AsyncService {

    @Async("taskExecutor")
    fun doSomethingAndReturnNothing(input: String) {

        // 비동기 실행 로직 작성
    }

    @Async("taskExecutor")
    fun doSomethingAndReturnSomething(input: String): CompletableFuture<String> {

        // 비동기 실행 로직 작성
        val output = input.toUpperCase()

        return CompletableFuture.completedFuture(output)
    }
}
  • Spring Boot 기반의 웹 애플리케이션은 HTTP 요청이 들어왔을 때 내장된 서블릿 컨테이너(Tomcat)가 관리하는 독립적인 1개의 Worker 쓰레드에 의해 동기 방식으로 실행된다. 하지만 요청 처리 중 @Async가 명시된 메써드를 호출하면 앞서 등록한 ThreadPoolTaskExecutor 빈에 의해 관리되는 또 다른 독립적인 Worker 쓰레드로 실행된다. 별도의 쓰레드로 동작하기에 원래의 요청 쓰레드는 그대로 다음 문장을 실행하여 HTTP 응답을 마칠 수 있다.
  • @Async가 명시된 메써드는 반드시 public으로 선언되어야 한다. 또한 같은 클래스 내에서 해당 메써드를 호출할 경우 비동기로 작동하지 않는다.
  • @Async("someExecutor")와 같이 java.util.concurrent.Executor 인터페이스를 구현한 빈의 이름을 명시할 수 있다. 대표적으로 앞서 언급한 ThreadPoolTaskExecutor 빈이 있다. 해당 메써드는 지정된 빈이 제공하는 쓰레드를 사용하여 실행된다.

  • 리턴 타입으로 CompletableFuture<String>와 같이 CompletableFuture 인터페이스를 명시하면 호출한 곳에서 쓰레드에 의해 실행될 해당 메써드의 미래 결과를 추적할 수 있다.

AsyncConsumerService 작성

  • 앞서 작성된 비동시 서비스 메써드를 호출하는 예제를 작성할 차례이다.
import org.springframework.stereotype.Service
import java.util.concurrent.CompletableFuture

@Service
class AsyncConsumerService(
    private val asyncService: AsyncService
) {
    fun doSomethingInParallel() {

        val completableFutures = mutableListOf<CompletableFuture<String>>()

        // alpha, bravo, charlie 를 아규먼트로 전달하여 동일 메써드를 3회 병렬 실행
        arrayOf("alpha", "bravo", "charlie").forEach {
            completableFutures.add(
                asyncService
                    .doSomethingAndReturnSomething(it)
                    .exceptionally { ex -> throw(ex) } // 비동기 쓰레드 실행 중 예외 발생시 호출 메써드로 던짐
            )
        }

        // 모든 비동기 쓰레드가 종료될 때까지 대기
        CompletableFuture.allOf(*completableFutures.toTypedArray()).join()

        // 종료된 각 비동기 쓰레드의 실행 결과 출력
        // ALPHA, BRAVO, CHARLIE
        completableFutures.forEach {
            println(it.get())
        }
    }
}
  • CompletableFuture<T>를 활용하여 개별 쓰레드로 병렬 실행되는 비동기 로직의 결과와 발생하는 예외를 제어할 수 있다. 이를 통해 단순히 실행하고 잊어버린다는 개념을 떠나, 적극적으로 I/O 부하 등으로 처리 시간이 발생하는 배치 처리를 복수개의 쓰레드로 실행하고 각 최종 결과를 획득하는 방식으로 소요 시간을 효과적으로 단축시킬 수 있다.

참고 글

댓글
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함