ultra_dev
스프링 배치 - 1편 본문
스프링 배치 - 1
* 배치 패턴
Reader : DB 등에서 다량의 데이터 조회
Processor : 받은 데이터 가공
Writer : 데이터를 다시 저장
* 배치 과정
1. 배치 프로세스 주기적으로 실행
2. 동시 다발적인 Job 배치처리
3. 순차적인 Step 처리
4. 실패 시, 스케줄링에 의한 재시작 (또는 수동...)
5. 반복
참고 사항) 만약 스프링부트 3버전 이상(스프링 배치 5버전 이상)을 사용하고 복수개의 Job 정의시에 아마 밑에 오류가 뜰 것!
[Job name must be specified in case of multiple jobs~]
Spring Boot 3 버전에서부터 Spring Batch 사용 시
기존에는 스프링 부트가 구동될 때 모든 Job 빈들을 읽어 실행하는 구조였다면, 새로운 버전에서부터는 복수 개의 Job이 컨텍스트 내에 정의되어 있다면 부트 구동 시점에 가동시킬 Job을 프로프티에 명시해야한다. (하나만 등록 가능)
만약 컨텍스트 내에 하나의 Job만 존재한다면, 해당 Job은 별도의 명시 없이 부트 구동 시점에 실행된다.
1. CLI 등을 통해 JobParameter를 전달할 때, 정의된 모든 Job에게 동일한 JobParameter가 전달되는 문제가 있다.
2. Job 구동 순서가 불분명해지고, 각 Job들이 실행되는 로그가 섞이게 된다.
3. 그 밖에도 여러가지 이유로 부트 구동 시점에 여러 개의 Job을 실행시키는 것은 이점보단 단점이 더 크다.
해결 예시 코드)
properties에 아래와 같은 방식으로 정의한다.
# batch#spring.batch.job.name=userDeleteJobBean
+ 추가 참고 사항
- 스프링 배치 활성화(배치5, 부트3버전 이전의 경우)
- @EnableBatchProcessing
- 스프링부트배치 설정 클래스 실행 → 빈 등록된 Job 초기화 및 수행
- 3 버전 이후부터 @EnableSpringBatch 안붙혀도 된다!!
- @EnableBatchProcessing
* 스프링 배치 메타 데이터
- 배치의 실행 및 관리를 목적으로 Job,Step 등 관련 정보들을 저장,업데이트,조회할 수 있는 스키마
- 성공 실패 여부 등을 볼 수 있어서 관리 용이, DB와 연동할 경우 필수적으로 생성 돼야함
- ALWAYS, EMBEDDED, NEVER 3종류가 있다.
- 스프링 부트의 기본 설정값은 EMBEDDED로 내장 DB 쓸 때만 스키마가 자동으로 새로 생성 된다
- ALWAYS는 항상 스키마 새로 생성
- NEVER은 스키마 생성 X → 내장 DB를 쓸 경우 생성이 안되면 당연히 오류 발생!!
* Job
- 배치 계층 구조의 최상위
- 배치 작업을 어떻게 구성하고 실행할 것인지 등을 설정한 객체
- 여러 Step 포함, 1개 이상의 Step으로 구성해야함
- JobInstance
- Job의 설정과 구성이 동일해도 실행 시점에 처리하는 내용이 다르니 이걸 구분한 실행 단위 객체(Job Parameter로 구분)
- Job과 일대다관계
- JobExecution
- Job 실행 중 발생 정보들 저장하고 있는 객체
- COMPLETED면 JobInstance가 완료된 걸로 간주 → 재실행 불가
- FAILED면 재실행 가능(JobParameter가 동일한 Job 실행해도 JobInstance 계속 실행 가능)
→ 즉 JobExecution이 Completed될때까지 하나의 JobInstance에서 여러번 시도가 생길 수도 있다!
* Step
- Job을 구성하는 독립적인 단계, 실제 배치 처리 정의
- StepExecution
- step 실행 중에 발생 정보 저장한 객체
- Job이 재시작 되더라도 이미 성공한 step은 재실행 X, 실패한 step만 실행!
- 즉 step의 stepExecution이 모두 성공해야 jobExecution이 성공한 것이다
- ExecutionContext
- 맵 형식으로 저장돼서 StepExecution이나 JobExecution 객체의 상태 저장
- 스텝 범위라면 스텝 끼리 공유가 안되고, 잡 범위라면 잡 간에 공유가 안되고 그 내부의 스텝끼리만 공유가 됨
- Job 재시작 시, 이미 처리했던 Row 데이터는 건너 뛰고 이후부터 수행하고자할 때 활용
- JobLauncher
- 배치 Job 실행 시키는 역할
- Job과 Job Parameters를 인자로 받는다.
- 동기적으로 실행 시 : [Client] run() → [JobLauncher] execute() → [Job] → [Business]
→ (다시 되돌아오기) → [Job] ExitStatus → [JobLauncher] JobExecution → [Client]
* FINISHED or FAILED 최종 완료 후 응답 값 반환
- 비동기적으로 실행 시 : [Client] run() → [JobLauncher] JobExecution → [Client]
- 일단, 즉시 응답값 반환 UNKNOWN
- 물론 그다음 작업은 진행 중!!
[JobLauncher] execute() → [Job] →[Business]
→ (다시 되돌아오기) → [Job] ExitStatus
* JobRepository
- 배치 작업 정보 저장소
- JobLauncher, Job, Step 구현체 내부에서 CRUD 작업
- 트랜잭션매니저, isolation 수준, 테이블 Prefix, varchar 등 설정 가능
* API 설정
- preventRestart()
- Job의 재시작 여부를 설정
- 기본 값은 true이며 false로 설정 시, 이 job은 재시작을 지원하지 않는다는 의미
- 만약 잡이 실패해도 재시작 안되고, 재시작 하려고 하면 JobRestartException 발생
- incrementer()
- 잡 파라미터에서 필요한 값을 증가시켜서 다음에 사용할 잡파라미터 리턴
- 즉 기존의 잡 파라미터 변경 없이 Job 여러번 실행하고 싶을 때 사용
* Task vs Chunk
스프링 배치에서 Step의 실행 단위는 크게 2가지로 나뉘어진다
- Chunk 기반
- 하나의 큰 덩어리를 n개씩 나눠서 실행, 대량 처리 시 효과적
- ItemReader, ItemProcessor, ItemWriter를 사용하며 청크 기반 전용 Tasklet인 ChunkOrientedTasklet 구현체가 제공
- [Job] → [TaskletStep] → [RepeatTemplate] →(Loof)Transaction{ [ChunkOrientedTasklet] → [ItemReader],[ItemProcessor],[ItemWriter] }
- Task 기반
- 단일 작업 기반으로 처리되는 것이 더 효율적인 경우(아이템리더, 아이템라이터 사용 x)
- 주로 Tasklet 구현체 만들어서 사용
- 대량 처리하는 경우 chunk 기반 보다 복잡한 구현 필요
- [Job] → [TaskletStep] → [RepeatTemplate] →(Loof)Transaction{ [Tasklet] → [Business Logic] }
* JobStep
- Job에 속하는 Step에 외부 Job을 포함하고 있는 Step
- 외부 잡이 실패하면 스텝이 실패하는거라 기본 잡도 당연 실패
- 메타데이터는 기본 잡이랑 외부 잡 별로 각각 저장된다
→ 작은 모듈로 쪼개서 Job의 흐름을 관리하고자 할 때 사용
* FlowJob
- Step을 순차적으로만 구성하는 것이 아니라 특정 상태에 따라 흐름 전환 하도록 하고 싶은 경우!
- Step이 실패해도 Job은 실패로 끝나지 않도록 해야 하는 경우
- Step이 성공했을 때는 특정 Step 실행하게 하고 싶은 경우
- 특정 Step 실행되지 않게 해야하는 경우
- Flow와 Job의 흐름에만 관여하고 실제 비즈니스 로직은 Step에서!
- 기존 방식(SimpleJob)은 순차적으로 스텝 1 → 2 → 3으로 진행되면 중간에 실패하면 전체 Job이 실패하고 뒤의 스텝은 실행되지 않지만 FlowJob은 흐름에 따라 분기처리 가능!!
- ExitStatus도 최종 flow의 상태가 됨
.start(Flow) -> 여기에 스텝을 넣으면 SimpleJobBuilder, Flow 넣으면 JobFlowBuilder가 반환
.next(Step or Flow or JobExecutionDecider)
.on(String pattern) -> TransitionBuilder 반환
.to(Step or Flow or JobExecutionDecider)
.stop()/fail()/end()/stopAndRestart(Step or Flow or JobExecutionDecider)
.end()
.build();
이런식으로 만들기 가능
- Transition
- Flow 내 Step의 조건부 전환!
- on 메소드 호출해서 TransitionBuilder가 반환되면 Transition Flow 구성 가능
- Step의 종료상태(ExitStatus)가 입력한 패턴과 매칭 안되면 예외 발생 + 잡 실패
- API
- on(String pattern)
- 스텝의 실행 결과로 돌려받는 종료상태(ExitStatus)와 매칭하는 스키마임!
- “*” : 모든 종료상태와 매칭
- 스텝의 실행 결과로 돌려받는 종료상태(ExitStatus)와 매칭하는 스키마임!
- to()
- 다음으로 실행할 단계 지정
- from()
- 이전 단계에서 정의한 Transition을 새롭게 추가 정의
- 병렬 느낌..!?
.start(step1()) .on("A") .to(step2()) -------단계 1을 실행해서 종료 상태가 A면 단계 2 실행 .on("*") .stop() -------단계 2 성공하면 종료 상태와 상관 없이 작업 중단 .from(step1()) .on(*) .to(step3()) -------단계 1 실행해서 종료 상태와 상관 없이 단계 3 실행 .next(step4()) -------단계 3 성공 시, 단계 4 실행 .on("*") .end() -------단계 4 실행해서 종료 상태와 상관 없이 작업 종류
- on(String pattern)
참고) ExitStatus도 커스텀 가능하다! FINISHED, FAIL 이런 것 외에도 원하는 거 넣어서 추가 조작 용이하게
* JobExecutionDecider
- ExitStatus 조작하거나 StepExecutionListener 등록할 필요 없이 Transition 처리 위한 전용 클래스
.start(step1())
.next(deecider())
.from(decider()).on("ODD").to(oddStep())
.from(decider()).on("EVEN").to(evenStep())
.end()
.build();
@Bean
Public JobExecutionDecider decider() {
return new OddDecider();
}
public static class OddDecider implements jobExecutionDecider {
private int count = 0;
@Override
public FlowExecutionStatus decide(JobExecution jobExecution
, StepExecution stepExecution){
count ++;
if(count % 2 == 0) {
return new FlowExecutionStatus("EVEN");
}else {
return new FlowExecutionStatus("ODD");
}
}
}
* FlowStep
- Step 내에 Flow 할당하여 실행시키는 객체
- flowStep의 BatchStatus와 ExitStatus는 Flow의 최종 상태값에 따라 결정된다.
public Step flowStep(){
.
.
.
.flow(flow()) //Step 내에서 실행 될 flow 설정, FlowStepBuilder 반환
.build();
}
* Scope
스프링 배치 스코프
- @JobScope, @StepScope
- 잡과 스텝의 빈 생성, 실행에 관여
- 프록시 모드를 기본 값으로 하는 스코프
- 해당 스코프가 선언되면 빈 생성은 어플리케이션 구동 시점이 아니라! 빈의 실행 시점에 이루어진다
- @Values를 주입해서 빈의 실행 시점에 값 참조 가능(일종의 Lazy Binding)
- @Values를 사용할 경우 빈 선언 문에 @JobScope, @StepScope를 정의하지 않으면 오류를 발생하므로 반드시 정의 해야 한다!
- 즉, @JobScope는 Step 선언문에 정의하고
- @StepScope는 Tasklet이나 아이템리더, 프로세서, 라이터 선언문에 정의 한다.
- 프록시 모드로 빈 선언되기 때문에 어플리케이션 구동 시 프록시 객체 생성 → 실행 시점에 실제 빈 호출
- 병렬 처리시에도 각 스레드마다 생성된 스코프 빈이 할당되기 때문에 스레드에 안전하게 실행 가능
@Bean @JobScope
public Step evaluationMailingStep(@Value("#{jobParameters[customer]}") String customer) {
return new StepBuilder(StepName.MAILING_STEP+"_"+customer, jobRepository)
.
.
.
}
* 스프링 배치 청크 프로세스
기본 개념
- Chunk : 여러개의 아이템을 묶은 하나의 덩어리, 블록
- 한번에 하나씩 아이템을 입력 받아서 Chunk 단위의 덩어리로 만들고, Chunk 단위로 트랜잭션을 처리한다
- 즉 Chunk 단위로 커밋, 롤백이 이루어짐
- 일반적으로 대용량 데이터를 한번에 처리하는 것이 아니라 청크 단위로 쪼개서 더 이상 처리할 데이터가 없을 때까지 반복해서 입출력하는데 사용!
<개별 vs 일괄>
- 아이템리더와 아이템 프로세서는 청크 내 개별 아이템 처리
- 아이템라이터는 청크 크기만큼 아이템 일괄 처리!!
ChunkProvider
- 아이템리더를 사용해서 아이템을 청크 사이즈만큼 읽어서 청크 단위로 만들어 제공하는 객체
- 아이템을 1개씩 읽어서 청크에 add
ChunkProcessor
- 아이템 프로세스서를 사용해서 아이템을 가공, 필터링 등 작업 후 아이템라이터를 사용해서 청크 데이터를 저장, 출력!
- 만약 아이템 프로세서 설정 안한 경우는 아이템리더에서 읽은 아이템 그대로가 Chunk에 저장되있음!
- 아이템프로세서 설정 시, 처리 완료된 List<Item>을 아이템라이터에게 전달 → 아이템라이터 처리가 완료되면 청크 트랜잭션이 종료!
ItemReader
- 다양한 입력으로부터 데이터 읽어서 제공하는 인터페이스
- csv,txt부터 xml,json이나 DB 혹은 메시지큐 서비스 등 읽기 가능
- 각각에 맞는 Reader 사용하기
- csv,txt부터 xml,json이나 DB 혹은 메시지큐 서비스 등 읽기 가능
- 아이템 단위로 작업
ItemProcessor
- 데이터 출력 전에 가공, 변형, 필터링 하는 역할
- 선택사항이다.
- 아이템리더 및 라이터와 분리되어 비즈니스 로직 구현 가능
- 아이템리더로부터 받은 아이템을 특정 타입으로 변환해서 아이템라이터로 넘겨줄 수 있다.
- 혹은 필터과정 거쳐서 원하는 아이템만 넘겨줄 수 도 있다.
ItemWriter
- 청크 단위로 데이터 받아서 일괄 출력 작업을 위한 인터페이스
- 아이템 하나가 아니라 아이템 리스트를 전달 받는다
<정리>
Chunk Size 내에서 Item 한건 씩 처리가 이루어진다.
ItemReader VS ItemProcessor
: 아이템리더에서 아이템을 한 개 읽고 나서 아이템 프로세서로 바로 전달하는게 아니라!
청크 사이즈만큼 아이템을 모두 읽은 다음! 아이템 프로세서에게 전달하면 읽은 아이템 갯수만큼 반복처리한다.
이후 아이템 라이터에서 청크 사이즈만큼 List<Item>을 전달 받아서 일괄적으로 출력작업을 한다.
즉 Chunk Size는 곧 Commit 간격이 된다!
'SPRING&JAVA' 카테고리의 다른 글
스프링 배치 - 3편 (간단한 성능 개선기, JPA) (0) | 2024.03.30 |
---|---|
스프링 배치 - 2편(DB) (0) | 2024.03.25 |
Jenkins & Docker를 활용한 CI/CD(Mac os 기준) (0) | 2024.02.28 |
AOP, Custom Annotation 활용기 (0) | 2023.11.07 |
View Table + JPA 매핑 (0) | 2023.11.06 |