



일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
- 카우치코딩 #couchcoding #6주포트폴리오 #6주협업프로젝트v
- Morphological analysis #Corpus
- SSR
- 비동기
- Machine Learning
- #스파르타코딩클럽후기 #내일배움캠프후기
- 플젝후체크
- github markdown
- Kakao
- expression statement is not assignment or call html
- 출처: 자바의 신 8장
- gitbash
- PID
- 클라이언트사이드렌더링
- github
- 파이콘
- 카우치코딩 #couchcoding #6주포트폴리오 #6주협업프로젝트
- 마크다운
- address
- Anaconda
- 코딩온라인
- khaiii
- 서버사이드렌더링
- Technical Writing
- 자바파이썬
- 파이썬
- 필사
- terminate
- 모바일웹스킨
- taskkill
- Today
- Total
개발 일기
스프링 배치 병렬 처리 가능하도록 변경하기. 본문
이전 편: https://writerroom.tistory.com/379
상황
공제 가맹점이라는 테이블 특성 상, Hit 가 적을 수 밖에 없기에 여기서는 성능 개선의 여지가 적다고 판단했다.
다시 로직을 보면, 아래 6개의 과정으로 분류할 수 있다.
팀과 논의 후, 슬로워 쿼리 개선 /캐시 적용 등을 이미 해보았으니, DB 부하 감소로는 최적화의 여지가 없다고 판단했다.
현재 로직이 이미 복잡하기에, 작업을 멀티 쓰레드에서 동시에 실행해서 처리 속도를 높여보기로 했다
[로직]
1)올해 국내 결제 내역 조회(Reader)
*2-5는 Processor
2)결제 건의 가맹점 번호를 기반으로 공제 타입 확인
3)이미 연말정산 된 거래인지 중복 확인
4)할인,그룹, 거래 타입 분류 등 기타 상세 내역 확인
5)정산에 필요한 정보를 여러 테이블에서 조합
6)저장 (Writer)
병렬 처리
배치를 병렬로 구성할 때, 두 가지 방법을 생각해보았다.
코드 변경 포인트가 많은지, 코드 제어가 얼마나 가능한지 등을 중점적으로 보았다.
방법 | 장점 | 단점 |
배치 코드 자체에 적용 (Task Executor) |
-Spring Batch 내부에서 병렬 처리되므로 관리 용이 -배치 실행 중 실패 시 Spring Batch의 Retry & Skip 기능 활용 가능 -하나의 Job 안에서 컨트롤 가능 |
Reader 등 모든 요소가 병렬 처리 가능하도록 변경 필요 |
'&' 을 사용하며 fromId-toId 로 물리적으로 데이터 범위 나눠 실행 (젠킨스 스크립트) |
위 방법보다 구현이 간단하다. | -상세한 코드 제어가 어려움 -(이전 글) preloadStep 에서 모든 가맹점 데이터를 레디스에 한번 올리고, 작업 후에 삭제한다. 여러 스레드에서 이 Step 을 병렬로 하다보면, 데이터 정합성 문제가 생긴다. |
2번 방식으로 하면, 아래와 같은 식으로 스크립트를 짜면 된다.
& 연산자는 리눅스/유닉스 기반 쉘 스크립트에서 사용되며, 특정 명령어를 백그라운드에서 실행할 때 활용된다.
Jenkins 에서 Build Step을 추가하면서, Execute Shell 에 아래와 같이 쓰면 여러 프로세스를 병렬로 실행할 수 있다.
java -jar batch-job.jar --fromId=1 --toId=10000 &
java -jar batch-job.jar --fromId=10001 --toId=20000 &
java -jar batch-job.jar --fromId=20001 --toId=30000 &
하지만 위의 단점에서 언급한 것처럼, Redis 의 데이터를 사용 후 삭제 하는 과정에서 정합성 문제가 생긴다 (이전 글)
병렬처리가 필요한 부분은 정산 내역을 저장하는 Step 이다. 하지만 위처럼 스크립트를 쓰면, Job 전체에 병렬이 적용된다. 따라서 정산 내역 저장 이후, Redis 를 저장/삭제하는 Step 도 함께 병렬 처리 된다.
1번 스레드가 작업을 완료하고, 데이터를 지우고 있다고 해보자. 이때, 5번 스레드는 아직 작업 중이라면?
1번 스레드가 가맹점 일부를 삭제해서, 5번 스레드는 해당 가맹점을 못찾는 문제가 발생 할 수 있다.
Task Executor 방법으로 하면, 정산 내역 저장 Step 에만 병렬을 적용하면 된다. 따라서, 위의 문제는 일어나지 않는다.
병렬 처리에 필요한 작업
1) 멤버 변수를 변경하는 작업이 있는가? 있다면, static 하지 않게 바꿔준다.
-> 현재 코드는 non-static 하게 구성되어 있어서, 바꿀 것이 없었다.
2) JPACursorItemReader → JpaPagingItemReader 변경
Cursor 는 기본적으로 싱글 스레드이다. DB 커서를 한개만 열기 때문이다.
반면에, Paging 방식에선, 페이지네이션으로 여러 스레드가 다른 페이지에 접근할 수 있어서 병렬 처리가 가능하다.
대신 paging 은 읽는 데이터가 늘어날 수록 offset 이 커져서, 속도가 느려지는 경향이 있다 (이때, page size 를 충분히 크게 하여. commit 주기를 줄이면 도움이 된다)
사실 두 리더 모두, 대량 처리에는 적합하지 않다고 분류된 글을 보았다.
하지만 지난 작업에서 JpaCursorItemReader 를 사용했을 때, 읽기 시간이 20초 정도로 적었기 때문에 성능면에서는 문제가 없었다.
읽기가 조금 늘어나더라도, 현재 Processor 가 핵심 병목이기 때문에, 감수할 수 있다고 판단했다. ZeroOffsetReader 같은 것을 별도로 생성하지는 않았다.
코드
1)Reader 가 병렬 처리 가능하도록 변경
무상태 처리를 해줘야, 멀티 쓰레드에서 사용 가능함.
The implementation is thread-safe in between calls to AbstractItemCountingItemStreamItemReader.open(ExecutionContext), but remember to use saveState=false if used in a multi-threaded client (no restart available)
출처: 스프링 공식 문서
As-Is
@Bean
@StepScope
fun transactionReaderV1(
@Value("#{jobParameters[taxYear]}") taxYear: Long = 0,
@Value("#{jobParameters[fromId]}") fromId: Long = 0,
@Value("#{jobParameters[toId]}") toId: Long = 0
): JpaCursorItemReader<transactionDto> {
if (taxReturntransactionRepository.existsByTaxYear(taxYear = Year) {
throw taxYearAlreadyExecutedException(“taxYear: $year already executed")
return JpaCursorItemReaderBuilder<TransactionDto>()
.name("transactionReaderV1)
.entityManagerFactory(entityManagerFactory)
.queryString(
"""
select new com.example.batch.business.taxreturn.dto.transactionDto(
member,
t.transactionAmount,
t.transactionType,
t.id,
td.cardAcceptorName,
td.cardAcceptorCode
)
from transaction t
join transactionDetail td
on t.id = td.transaction_id
join wallet w
on w.id = t.id
where tdtransaction between :fromDate and :toDate
and t.id >= :fromId and t.id <= :toId
""".trimIndent()
).parameterValues(
mapOf(
"fromDate” to taxYear.fromDate,
"toDate” to taxYear.toDate,
"fromId" to fromId,
"toId" to toId
)
)
.build()
}
To-Be
@Bean
@StepScope
fun transactionReaderV2(
@Value("#{jobParameters[taxYear]}") taxYear: Long = 0,
@Value("#{jobParameters[fromId]}") fromId: Long = 0,
@Value("#{jobParameters[toId]}") toId: Long = 0,
): JpaPagingItemReader<TempAuthorizationTransactionDto> {
if (taxReturntransactionRepository.existsByTaxYear(taxYear = Year) {
throw taxYearAlreadyExecutedException(“taxYear: $year already executed")
return JpaPagingItemReaderBuilder<TransactionDto>()
.name("transactionReaderV2")
.entityManagerFactory(entityManagerFactory)
.queryString(
"""
select new com.example.batch.business.taxreturn.dto.TransactionDto(
member,
t.transactionAmount,
t.transactionType,
t.id,
td.cardAcceptorName,
td.cardAcceptorCode
)
from transaction t
join transactionDetail td
on t.id = td.transaction_id
join wallet w
on w.id = t.id
where tdtransaction between :fromDate and :toDate
and t.id >= :fromId and t.id <= :toId
""".trimIndent()
).parameterValues(
mapOf(
"fromDate” to taxYear.fromDate,
"toDate” to taxYear.toDate,
"fromId" to fromId,
"toId" to toId
)
)
.build()
.saveState(false)
.pageSize(pageSize)
.build()
}
이때, maxPoolSize 는 Hikari Pool Size 보다 적게 설정해야 SQL connection timeout 나지 않는다. (application.yml 설정 확인)
@Bean
fun customTaskExecutor(): ThreadPoolTaskExecutor {
val executor = ThreadPoolTaskExecutor()
executor.corePoolSize = maxPoolSize
executor.maxPoolSize = maxPoolSize
executor.setThreadNamePrefix("taxreturn-batch-thread")
executor.initialize()
return executor
}
@Bean
@JobScope
fun saveTransactionStepV2() = stepBuilderFactory["saveTransactionStepV2"]
.chunk<TransactionDto, List<TransactionDto>>(chunkSize)
.reader(transactionReaderV2())
.processor(checkTaxReturnTypeByAcceptorProcessorV2())
.writer(taxReturnTransactionWriterV2(taxReturnTransactionRepository))
.taskExecutor(customTaskExecutor())
.throttleLimit(maxPoolSize)
.build()
출처
'Tech > 배워서 남주기' 카테고리의 다른 글
Redis Pipelining 으로 26만개 데이터 한번에 Insert 하기 (0) | 2024.11.04 |
---|---|
Redis Clustering 를 통해 FailOver 구현하기 (도커 컴포즈 환경) (0) | 2023.01.25 |
Insert가 많을 때 소요 시간 개선하기 - (1) myBatis forEach 적용 (0) | 2023.01.16 |
[JPA/MYSQL] 재고에 동시 접근할 때 일어나는 갱신 분실 문제 해결하기 (0) | 2023.01.09 |
협업에 용이한 작업 환경 구성하기 (0) | 2022.11.30 |