SW 개발

Spring Boot, JobRunr, 분산 노드에서 작업 실행하기

지단로보트 2022. 1. 12. 18:40

개요

  • 백엔드에는 실시간으로 이루어지는 클라이언트로부터의 요청/응답 처리 외에도 최소 1분 이상의 오랜 시간이 소요되는 성격의 무거운 작업을 실행해야 하는 상황이 많다. 이를 위한 수단으로 Spring Batch, Quartz 등이 있는데 학습 곡선이 있는 편이다. JobRunrn개로 수평 확장되는 멀티 노드 마이크로서비스 환경에서 복잡성을 최소화화면서 가장 간단한 방법으로 Fire and Forget 성격의 특정 작업을 분산 실행할 수 있는 오픈 소스 Java 라이브러리이다. 이번 글에서는 Spring Boot 기반 프로젝트에서 JobRunr을 사용하는 법을 정리하였다.

JobRunr 작동 원리

  • 물리적으로 동일한 애플리케이션끼리만 Job을 분산하여 실행할 수 있다. 실행해야할 대상 메써드와 파라메터를 JSON으로 직렬화하여 전달하며, Job을 실행하게될 애플리케이션도 호출한 애플리케이션과 동일해야 한다.
  • 각 애플리케이션 노드는 BackgroundJobServer라 불리는 빈에 의해 기본적으로 128개의 쓰레드 풀을 이용하여 호출된 Job을 실행한다.
  • 각 애플리케이션 노드 간의 통신 방법은 오직 원격지의 데이터베이스 뿐이다. 즉, 동일한 데이터베이스를 StorageProvider로 설정한 애플리케이션끼리만 같은 클러스터로 기능할 수 있다. 데이터베이스만 바라보므로 대시보드를 제외하고는 어떠한 포트 개방도 필요 없다.
  • 이상의 내용을 정리하면, 특정 노드에서 Job 실행을 요청하면 데이터베이스에 해당 JobEnqueued/Scheduled 상태로 저장되고, 처리 가능한 다른 노드가 해당 Job을 데이터베이스에서 가져오면 Processing 상태로 변경된다. 실행이 완료되면 Succeeded 상태로 변경되고, 실패하면 Failed 상태로 변경된다.
  • 모든 Job의 진행 상황은 빌트인된 대시보드 페이지에서 확인할 수 있다. Job 내에서 ProgressBar를 구현했다면 진행율까지 확인할 수 있다. (대시보드는 허가된 계정으로만 접속하도록 설정이 가능하다.)

JobRunr 장점

  • 특정 인터페이스의 구현체를 작성할 필요 없이 일반적인 스프링 빈의 메써드에 @Job만 추가하면 분산 노드에서 실행 가능한 작업으로 변신할 수 있다.
  • 분산 처리가 아주 쉽다. 일시적으로 아주 오래 걸리는 무거운 작업을 실행하고자할 경우 노드를 충분히 수평 확장하고, 작업을 적절히 쪼개어 실행시키면 빠르게 끝낼 수 있다. (그래서 스케일링이 유연한 컨테이너 오케스트레이션 환경과의 궁합이 좋다.)
  • 직관적인 UI의 대시보드를 제공한다. 대시보드 페이지에서 모든 작업의 진행 상황과 성공, 실패 여부를 실시간으로 확인할 수 있다.

build.gradle.kts

  • 프로젝트 루트의 build.gradle.kts에 아래 내용을 추가한다.
dependencies {
    implementation("org.jobrunr:jobrunr-spring-boot-starter:4.0.5")
}

application.yml

  • application.yml에 아래 내용을 추가한다.
org:
  jobrunr:
    database:
        # 데이터베이스 생성/사용될 테이블의 접두어
        table_prefix: foo
    background-job-server:
      enabled: true
    dashboard:
        # 대시보드 활성화
      enabled: true
      # 대시보드 리스너 포트
      port: 8000
      # 대시보드 어드민 계정
      username: {username}
      password: {password}
  • 프로덕션 환경에서는 아래와 같이 환경 변수로 설정할 수 있다.
ORG_JOBRUNR_BACKGROUND_JOB_SERVER_ENABLED=true
ORG_JOBRUNR_DASHBOARD_ENABLED=true
ORG_JOBRUNR_DASHBOARD_PORT=8000
ORG_JOBRUNR_DASHBOARD_USERNAME=admin
ORG_JOBRUNR_DASHBOARD_PASSWORD=${spring-security-user-password}

@Configuration

  • @Configuration 빈을 아래와 같이 작성한다. 예제로 인메모리 데이터베이스를 작성했는데 프로덕션 레벨을 위한 MySQL, MongoDB, Redis 등 다양한 선택지가 존재한다.
import org.jobrunr.jobs.mappers.JobMapper
import org.jobrunr.storage.InMemoryStorageProvider
import org.jobrunr.storage.StorageProvider
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class JobRunrConfig {

    @Bean
    fun storageProvider(jobMapper: JobMapper): StorageProvider {

        val storageProvider = InMemoryStorageProvider()
        storageProvider.setJobMapper(jobMapper)
        // [옵션] Job의 상태 변화 시점을 제어할 수 있는 리스너 빈 등록
        storageProvider.addJobStorageOnChangeListener(JobChangeListenerImpl())

        return storageProvider
    }
}
  • JobRunr의 모든 Job의 실행은 기본적으로 Fire and Forget이다. 따라서 코드 레벨에서 해당 Job의 실행 결과에 개입할 수 없게 되어 있다. 하지만, JobChangeListener 인터페이스의 구현체를 제작하고 위와 같이 등록하면 특정 Job의 상태 변화 시점에 필요한 행위를 할 수 있다. (예를 들면 해당 Job의 완료, 실패 시점에 RedisPub/Sub으로 메시지를 전파할 수 있다.)

Job 실행 예

import org.jobrunr.jobs.annotations.Job
import org.jobrunr.scheduling.JobScheduler
import org.springframework.stereotype.Service

@Service
class FooJobService(
    private val jobScheduler: JobScheduler
) {
    fun launchJob() {

        // [방법 1] fooJob()을 실행하고 잊어버린다.
        val jobId = jobScheduler.enqueue { fooJob({fooId}) }

        // [방법 2] fooJob()을 실행하고 잊어버린다.
        // 미리 생성한 UUID를 전달
        val jobId = UUID.randomUUID()
        jobScheduler.enqueue(jobId) { fooJob({fooId}) }
    }

    // JobRunr의 Job을 정의
    // 예외 발생시 최대 재시도 횟수를 지정하지 않으면 기본값 10
    // [옵션 1] @Job을 생략해도 실행 가능하지만 옵션 설정이 불가능
    // [옵션 2] JobContext를 생략해도 실행 가능하지만 Job 실행시 메타 정보 획득이 불가능
    @Job(name = "fooJob", retries = 3)
    fun fooJob(jobContext: JobContext, fooId: Int) {

        // JobContext 정보 출력
        jobContext.jobId // eecdb998-cc27-47e2-ae6b-8e4ad3071133
        jobContext.jobName // fooJob
        jobContext.jobState // PROCESSING
        jobContext.createdAt // 2022-01-12T15:54:12.503152Z
        jobContext.updatedAt // 2022-01-12T15:54:19.083501Z
        jobContext.signature // com.jsonobject.example.FooJobService.fooJob(org.jobrunr.jobs.context.JobContext,java.lang.Integer)
        jobContext.metadata // {}
        
        // 대시보드에 실시간으로 출력될 처리 경과의 퍼센트 값을 로직에서 설정 가능
        val progressBar = jobContext.progressBar(100)
        (1..100).forEach {
            Thread.sleep(1000)
            progressBar.setValue(it)
        }
    }
}
  • Job 실행 중에 예외가 발생할 경우, 별도로 예외를 처리하지 않아도 대시보드에 예외에 대한 stackTrace 문자열을 출력해준다.

Job 예외 발생시 출력 로그

  • 특정 Job을 실행시 예외가 발생하면 최대 재시도 횟수를 채우기 전에는 아래와 같이 WARN 로그를 남긴다.
WARN 22858 --- [ool-2-thread-47] o.jobrunr.server.BackgroundJobPerformer  : Job(id=81d1d847-ebf1-4f04-916a-107eeb90babf, jobName='fooJob') processing failed: An exception occurred during the performance of the job
  • 최대 재시도 횟수를 채우게 되면 최종적으로 아래와 같이 ERROR 로그를 남긴다.
ERROR 22858 --- [ool-2-thread-56] o.jobrunr.server.BackgroundJobPerformer  : Job(id=688f227c-0f20-41c3-abfd-5413f8a46804, jobName='fooJob') processing failed: An exception occurred during the performance of the job

참고 글