Recent Posts
Recent Comments
Link
«   2025/04   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30
Archives
Today
Total
관리 메뉴

개발 일기

스프링 배치 병렬 처리 가능하도록 변경하기. 본문

Tech/배워서 남주기

스프링 배치 병렬 처리 가능하도록 변경하기.

flow123 2024. 11. 16. 22:25

이전 편: https://writerroom.tistory.com/379

상황

공제 가맹점이라는 테이블 특성 상, Hit 가 적을 수 밖에 없기에 여기서는 성능 개선의 여지가 적다고 판단했다.

다시 로직을 보면, 아래 6개의 과정으로 분류할 수 있다. 

팀과 논의 후, 슬로워 쿼리 개선 /캐시 적용 등을 이미 해보았으니, DB 부하 감소로는 최적화의 여지가 없다고 판단했다. 

현재 로직이 이미 복잡하기에, 작업을 멀티 쓰레드에서 동시에 실행해서 처리 속도를 높여보기로 했다

 

[로직] 

 

1)올해 국내 결제 내역 조회(Reader)

 

*2-5는 Processor

2)결제 건의 가맹점 번호를 기반으로 공제 타입 확인

3)이미 연말정산 된 거래인지 중복 확인

4)할인,그룹, 거래 타입 분류 등 기타 상세 내역 확인

5)정산에 필요한 정보를 여러 테이블에서 조합

 

6)저장 (Writer)

병렬 처리 

배치를 병렬로 구성할 때, 두 가지 방법을 생각해보았다. 

코드 변경 포인트가 많은지, 코드 제어가 얼마나 가능한지 등을 중점적으로 보았다.

방법 장점  단점 
배치 코드 자체에 적용
(Task Executor)
-Spring Batch 내부에서 병렬 처리되므로 관리 용이
-배치 실행 중 실패 시 Spring Batch의 Retry & Skip 기능 활용 가능
-하나의 Job 안에서 컨트롤 가능
Reader 등 모든 요소가 병렬 처리 가능하도록 변경 필요 

'&' 을 사용하며
fromId-toId 로 물리적으로 데이터 범위 나눠 실행 
(젠킨스 스크립트) 
위 방법보다 구현이 간단하다.  -상세한 코드 제어가 어려움
-(이전 글) preloadStep 에서 모든 가맹점 데이터를 레디스에 한번 올리고, 작업 후에 삭제한다. 여러 스레드에서 이 Step 을 병렬로 하다보면, 데이터 정합성 문제가 생긴다.

 

2번 방식으로 하면, 아래와 같은 식으로 스크립트를 짜면 된다. 

& 연산자는 리눅스/유닉스 기반 쉘 스크립트에서 사용되며, 특정 명령어를 백그라운드에서 실행할 때 활용된다. 

Jenkins 에서 Build Step을 추가하면서, Execute Shell 에 아래와 같이 쓰면 여러 프로세스를 병렬로 실행할 수 있다

java -jar batch-job.jar --fromId=1 --toId=10000 &
java -jar batch-job.jar --fromId=10001 --toId=20000 &
java -jar batch-job.jar --fromId=20001 --toId=30000 &

하지만 위의 단점에서 언급한 것처럼, Redis 의 데이터를 사용 후 삭제 하는 과정에서 정합성 문제가 생긴다 (이전 글)   

병렬처리가 필요한 부분은 정산 내역을 저장하는 Step 이다. 하지만 위처럼 스크립트를 쓰면, Job 전체에 병렬이 적용된다. 따라서 정산 내역 저장 이후, Redis 를 저장/삭제하는 Step 도 함께 병렬 처리 된다. 

 

1번 스레드가 작업을 완료하고, 데이터를 지우고 있다고 해보자. 이때, 5번 스레드는 아직 작업 중이라면?

1번 스레드가 가맹점 일부를 삭제해서, 5번 스레드는 해당 가맹점을 못찾는 문제가 발생 할 수 있다.

Task Executor 방법으로 하면, 정산 내역 저장 Step 에만 병렬을 적용하면 된다. 따라서, 위의 문제는 일어나지 않는다. 

병렬 처리에 필요한 작업 

1) 멤버 변수를 변경하는 작업이 있는가? 있다면, static 하지 않게 바꿔준다.

->  현재 코드는 non-static 하게 구성되어 있어서, 바꿀 것이 없었다. 

 

2) JPACursorItemReader → JpaPagingItemReader 변경

 

Cursor 는 기본적으로 싱글 스레드이다. DB 커서를 한개만 열기 때문이다. 

반면에, Paging 방식에선, 페이지네이션으로 여러 스레드가 다른 페이지에 접근할 수 있어서 병렬 처리가 가능하다. 

대신 paging 은 읽는 데이터가 늘어날 수록 offset 이 커져서, 속도가 느려지는 경향이 있다 (이때, page size 를 충분히 크게 하여.  commit 주기를 줄이면 도움이 된다)

 

사실 두 리더 모두, 대량 처리에는 적합하지 않다고 분류된 글을 보았다.

하지만 지난 작업에서 JpaCursorItemReader 를 사용했을 때, 읽기 시간이 20초 정도로 적었기 때문에 성능면에서는 문제가 없었다.

읽기가 조금 늘어나더라도, 현재 Processor 가 핵심 병목이기 때문에, 감수할 수 있다고 판단했다. ZeroOffsetReader 같은 것을 별도로 생성하지는 않았다.

코드

1)Reader 가 병렬 처리 가능하도록 변경 

무상태 처리를 해줘야, 멀티 쓰레드에서 사용 가능함.

The implementation is thread-safe in between calls to AbstractItemCountingItemStreamItemReader.open(ExecutionContext), but remember to use saveState=false if used in a multi-threaded client (no restart available)

출처: 스프링 공식 문서 

 

As-Is 

    @Bean
    @StepScope
    fun transactionReaderV1(
	@Value("#{jobParameters[taxYear]}") taxYear: Long = 0,
        @Value("#{jobParameters[fromId]}") fromId: Long = 0,
        @Value("#{jobParameters[toId]}") toId: Long = 0
    ): JpaCursorItemReader<transactionDto> {
        if (taxReturntransactionRepository.existsByTaxYear(taxYear = Year) {
            throw taxYearAlreadyExecutedException(“taxYear: $year already executed")
        
        return JpaCursorItemReaderBuilder<TransactionDto>()
            .name("transactionReaderV1)
            .entityManagerFactory(entityManagerFactory)
            .queryString(
                """
                    select new com.example.batch.business.taxreturn.dto.transactionDto(
                          member,
                          t.transactionAmount,
                          t.transactionType,
                          t.id,
                          td.cardAcceptorName,
                          td.cardAcceptorCode
                           )
                    from transaction t
                    join transactionDetail td
                    on t.id = td.transaction_id 
                    join wallet w
                    on w.id = t.id 
                    where tdtransaction between :fromDate and :toDate
                    and t.id >= :fromId and t.id <= :toId
                    """.trimIndent()
            ).parameterValues(
                mapOf(
                    "fromDate” to taxYear.fromDate,
                    "toDate” to taxYear.toDate,
                    "fromId" to fromId,
                    "toId" to toId
                )
            )
            .build()
    }

 

To-Be 

    @Bean
    @StepScope
    fun transactionReaderV2(
        @Value("#{jobParameters[taxYear]}") taxYear: Long = 0,
        @Value("#{jobParameters[fromId]}") fromId: Long = 0,
        @Value("#{jobParameters[toId]}") toId: Long = 0,
    ): JpaPagingItemReader<TempAuthorizationTransactionDto> {
    
        if (taxReturntransactionRepository.existsByTaxYear(taxYear = Year) {
            throw taxYearAlreadyExecutedException(“taxYear: $year already executed")
        
        return JpaPagingItemReaderBuilder<TransactionDto>()
            .name("transactionReaderV2")
            .entityManagerFactory(entityManagerFactory)
            .queryString(
                """
                    select new com.example.batch.business.taxreturn.dto.TransactionDto(
                          member,
                          t.transactionAmount,
                          t.transactionType,
                          t.id,
                          td.cardAcceptorName,
                          td.cardAcceptorCode
                    )
                    from transaction t
                    join transactionDetail td
                    on t.id = td.transaction_id 
                    join wallet w
                    on w.id = t.id 
                    where tdtransaction between :fromDate and :toDate
                    and t.id >= :fromId and t.id <= :toId
                    """.trimIndent()
            ).parameterValues(
                mapOf(
                    "fromDate” to taxYear.fromDate,
                    "toDate” to taxYear.toDate,
                    "fromId" to fromId,
                    "toId" to toId
                )
            )
            .build()
            .saveState(false)
            .pageSize(pageSize)
            .build()
    }

 

이때, maxPoolSize 는 Hikari Pool Size 보다 적게 설정해야 SQL connection timeout 나지 않는다. (application.yml 설정 확인) 

    @Bean
    fun customTaskExecutor(): ThreadPoolTaskExecutor {
        val executor = ThreadPoolTaskExecutor()
        executor.corePoolSize = maxPoolSize
        executor.maxPoolSize = maxPoolSize
        executor.setThreadNamePrefix("taxreturn-batch-thread")
        executor.initialize()
        return executor
    }

    @Bean
    @JobScope
    fun saveTransactionStepV2() = stepBuilderFactory["saveTransactionStepV2"]
        .chunk<TransactionDto, List<TransactionDto>>(chunkSize)
        .reader(transactionReaderV2())
        .processor(checkTaxReturnTypeByAcceptorProcessorV2())
        .writer(taxReturnTransactionWriterV2(taxReturnTransactionRepository))
        .taskExecutor(customTaskExecutor())
        .throttleLimit(maxPoolSize)
        .build()

출처 

https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/database/JpaPagingItemReader.html

Comments