Notice
Recent Posts
Recent Comments
Link
«   2024/11   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
Tags more
Archives
Today
Total
관리 메뉴

ultra_dev

스프링 배치 - 2편(DB) 본문

SPRING&JAVA

스프링 배치 - 2편(DB)

ultra_dev 2024. 3. 25. 22:13

DB를 바탕으로 한 스프링 배치

기본 개념

: 배치는 실시간 처리가 어려운 대용량 데이터를 다루고 DB의 I/O 성능 문제 및 메모리 자원 효율성 문제를 고려 해야한다

→ 스프링 배치에서는 대용량 데이터 처리 위한 2가지 방법을 제시한다!

 


  1. Cursor Based 처리
  • JDBC ResultSet의 기본 메커니즘 사용
  • AbstractPagingItemReader 클래스
  • 현재 행에 커서 유지하며 다음 데이터 호출 시, 다음 행으로 커서 이동하며 데이터 반환이 이루어지는 streaming 방식의 I/O
    • ResultSet이 Open 될때마다 next() 메소드 호출 돼서 데이터베이스의 데이터가 반환되고 객체와 매핑이 이루어짐
  • DB Connection이 연결되면 배치 처리가 완료될 때까지 데이터를 읽어오기 때문에 DB와 SocketTimeout을 충분히 큰 값 설정해야 한다!
  • 단점 1 : 모든 결과를 메모리에 할당하기 때문에 메모리 사용량이 많아진다는 단점 존재
    (반면 Connection 연결 유지 시간과 메모리 공간이 충분하다면 대량 데이터 처리에 적합할 수 있다!)
  • 단점 2 : 스레드 안정성을 보장하지 못함. 멀티 스레드 환경에서 사용 시, 동시성 이슈 발생하지 않도록 별도 동기화 처리 필요
    <참고> 예시로 밑의 코드를 보면 커서 베이스와 달리 Page Based의 페이징 아이템 리더의 경우
    doRead()에서 동기화처리가 되고 있음을 알 수 있음!
// 동기화 처리가 된 페이징 아이템 리더의 예시 코드

	@Override
	protected T doRead() throws Exception {

		synchronized (lock) {

			if (results == null || current >= pageSize) {

				if (logger.isDebugEnabled()) {
					logger.debug("Reading page " + getPage());
				}

				doReadPage();
				page++;
				if (current >= pageSize) {
					current = 0;
				}

			}

			int next = current++;
			if (next < results.size()) {
				return results.get(next);
			}
			else {
				return null;
			}

		}

	}

 

 

2. Paging Based 처리

  • 페이징 단위로 데이터 조회, 페이징 사이즈만큼 메모리에 가져와서 한 개씩 읽는다!
  • 한 페이지 읽을 때마다 Connection을 맺고 끊기 때문에 대량의 데이터 처리하더라도 SocketTimeout이 발생하지는 않는다.
  • Offset, LImit
  • 즉 페이징 단위의 결과만 메모리에 할당하니까 메모리 사용량이 적어지는 것
  • 따라서 Connection 연결 유지 시간이 길지 않고, 메모리 공간을 효율적으로 사용해야 한다면 적합할 수 있다!
  • 주의사항으로 Paging을 사용할 것이라면 데이터가 정렬이 되어있어야 한다(order by)
    • 각 페이지마다 새로운 쿼리를 실행하는 거니 페이징 시 결과 정렬 하는 것이 중요함
    • 따라서 결과의 순서가 보장될 수 있도록 order by 권장하는 것



ItemReader(Cursor 기반)

  • JdbcCursorItemReader
    • 개념 : 커서 기반 JDBC 구현체, ResultSet과 함께 사용되며 Datasource에서 Connection을 얻어와서 SQL 실행
    @Bean
    public JdbcCursorItemReader itemReader(){
    
    	return new JdbcCursorItemReaderBuilder<T>()
    		.name("cursorItemReader")
    		.fetchSize(int chunkSize) // 커서 방식으로 데이터 가지고 올 때, 메모리 할당 크기 (청크 사이즈)
    		.dataSource(DataSource) // 디비 접근 데이터소스 설정
    		.rowMapper(RowMapper) // 쿼리 결과로 반환되는 데이터와 객체 매핑 위한 로우매퍼 설정
    		.beanRowMapper // 별도의 로우매퍼 설정 안하고 클래스타입 설정 시 자동으로 객체와 매핑
    		.queryArguments(Object... args) // 쿼리 파라미터 설정 (위의 쿼리에 추가해서 동적으로 쿼리 생성 가능하겠지?)
    		.maxItemCount(int count) // 조회할 최대 아이템 수 (여기까지만 조회하고 끝내겠다 느낌. 청크랑은 다름)
    		.currentItemCount(int count) // 조회 아이템의 시작 지점
    		.maxRows(int maxRows) // ResultSet 오브젝트가 포함할 수 있는 최대행 수
    		.build();

예시

@Bean
public JdbcCursorItemReader<ProjectWriteDomain> itemReader(){
    return new JdbcCursorItemReaderBuilder<ProjectWriteDomain>()
        .name("cursorItemReader")
        .fetchSize(100) // 한 번에 가져올 데이터의 크기 설정 (청크 사이즈)
        .dataSource(dataSource) // 데이터베이스 접속에 사용할 DataSource 설정
        .rowMapper(new BeanPropertyRowMapper<>(ProjectWriteDomain.class)) // ResultSet의 각 행을 Domain 객체와 매핑하기 위한 RowMapper 설정
        .query("SELECT * FROM projects WHERE status = ?") // 실행할 쿼리 설정
        .queryArguments("ACTIVE") // 쿼리 파라미터 설정
        .maxItemCount(1000) // 최대로 읽어올 아이템의 개수 설정
        .maxRows(1000) // ResultSet이 최대로 포함할 수 있는 행의 개수 설정
        .build();
}




  • JpaCursorItemReader
    • 개념 : 커서 기반 JPA 구현체, EntityManagerFactory 객체가 필요하며 쿼리는 JPQL 사용
    • 문제점 : 데이터를 DB에서 모두 읽고 서비스 인스턴스에서 직접 Iterator로 cursor로 동작하는 것처럼 흉내 내는 방식..!
      모든 데이터를 메모리에 들고 있기 때문에 OOM을 유발한다!!
    • API
    @Bean
    public JpaCursorItemReader itemReader(){
    	return new JpaCursorItemReaderBuilder<T>()
    		.name("cursorItemReader")
    		.queryString(String JQPL) // ItemReader가 조회할 때 사용할 JPQL 문장 설정
    		.EntityManagerFactory(EntityManagerFactory) // JPQL 실행 엔티티매니저 생성 팩토리
    		.parameterValue(Map<String, Object> parameters) // 쿼리 파라미터 설정
    		.maxItemCount(int count) // 조회할 최대 아이템 수
    		.currentItemCount(int count) // 조회 아이템의 시작 지점
    		.build();
    
    

 

@Bean
public JpaCursorItemReader<ProjectWriteDomain> itemReader(EntityManagerFactory entityManagerFactory) {
    
    HashMap<String, Object> parameters = new HashMap<>();
    parameters.put("status", "ACTIVE");
    
    return new JpaCursorItemReaderBuilder<ProjectWriteDomain>()
            .name("cursorItemReader")
            .queryString("SELECT p FROM ProjectWriteDomain p WHERE p.status = :status") // 실행할 JPQL 쿼리 설정
            .entityManagerFactory(entityManagerFactory) // JPQL 실행 엔티티 매니저 생성 팩토리
            .parameterValues(parameters) // 쿼리 파라미터 설정
            .maxItemCount(1000) // 최대로 읽어올 아이템의 개수 설정
            .build();
}

 

 

 

ItemReader(Page 기반)

  • JdbcPagingItemReader
    • 개념 : 페이징 기반 JDBC 구현체, 쿼리에 시작행 번호(offset)과 페이지에서 반환할 행 수(limit) 지정
      • 스프링 배치에서 offset과 limit을 페이지 사이즈에 맞게 자동으로 생성, 페이징 단위로 조회 시 새로운 쿼리 실행
      • 페이지마다 새로운 쿼리가 실행되기 때문에 데이터 순서 보장 필요!! order by 잊지말자
      • 마찬가지 이유로 멀티스레드 환경에서 Thread 안정성이 보장된다. (커서 기반은 위험)
    • PagingQueryProvider
      • 쿼리 실행에 필요한 쿼리문을 아이템리더에게 제공
      • 데이터베이스마다 페이징 전략이 다르기 때문에 각 db마다 다른 PagingQueryProvider 사용
        • Select, from, sortKey는 필수 설정! where이나 group by는 선택사항
    @Bean
    public JdbcPagingItemReader itemReader(){
    	return new JdbcPagingItemReaderBuilder<T>()
    		.name("pagingItemReader")
    		.pageSize(int pageSize) // 페이지 크기 설정(쿼리 당 요청 레코드 수)
    		.dataSource(DataSource) // 디비 접근 데이터소스
    		.queryProvider(PagingQueryProvider) // DB 페이징 전략에 따른 페이징쿼리프로바이더 설정
    		.rowMapper(Class<T>) // 쿼리 결과로 반환되는 데이터와 객체 매핑 위한 로우매퍼 설정
    		----여기부턴 PagingQueryProvider 역할 하는 것들인데 따로 빼서 쓴다 보통-----
    		.selectClause(String selectClause) // select 절
    		.fromClause(String fromClause) // from 절
    		.whereClause(String whereClause) // where 절
    		.groupClause(String groupClause) // group 절
    		.sortKeys(Map<String, Order> sortKey) // 정렬을 위한 유니크한 키 설정
    		----여기까지 PagingQueryProvider 역할----
    		.parameterValues(Map<String, Object> parameters // 쿼리 파라미터 설정
    		.maxItemCount(int count) // 조회할 최대 아이템 수
    		.currentItemCount(int count) // 조회 아이템의 시작 지점
    		.maxRows(int maxRows) // ResultSet 오브젝트가 포함 가능한 최대 행 수
    		.build();
    
    

<예시>

@Bean
public JdbcPagingItemReader<ProjectWriteDomain> itemReader() {
    Map<String, Object> parameterValues = new HashMap<>();
    parameterValues.put("status", "ACTIVE");

    return new JdbcPagingItemReaderBuilder<ProjectWriteDomain>()
        .name("pagingItemReader")
        .dataSource(dataSource)
        .pageSize(10)
        .rowMapper(new BeanPropertyRowMapper<>(ProjectWriteDomain.class))
        .queryProvider(createQueryProvider()) // 쿼리 프로바이더 설정
        .parameterValues(parameterValues)
        .pageSize(1000) // 페이지 크기 설정
        .build();
}

@Bean
private PagingQueryProvider createQueryProvider() {
    SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
    queryProvider.setDataSource(dataSource);
    queryProvider.setSelectClause("id, project_name, created_at, status");
    queryProvider.setFromClause("FROM projects");
    queryProvider.setWhereClause("WHERE status = :status");
    ---
    Map<String, Order> sortKetys = new HashMap<>();
    sortKeys.put("id", Order.ASCENDING);
    ---
    queryProvider.setSortKey(sortKeys); // 정렬 키 설정
    
    
    return queryProvider.getObject();
}

 

 


  •  JpaPagingItemReader
    • 개념 : Paging 기반 JPA 구현체, EntityManagerFactory 객체 필요, JPQL 쿼리 사용
    • JPA 페이징 아이템 리더는 JPA 엔티티와 관련된 JPQL 쿼리를 직접 사용하여 데이터를 읽기 때문에 쿼리 프로바이더가 필요하지 않고, 대신, queryString 메서드를 사용하여 JPQL 쿼리를 설정한다!
  • API
@Bean
public JpaPagingItemReader itemReader(){

	return new JpaPagingItemReaderBuilder<T>()
		.name("pagingItemReader")
		.pageSize(int count) // 페이지 크기 설정 (쿼리 당 요청 레코드 수)
		.queryString(String JQPL) // 아이템리더가 조회할 때 사용할 JPQL
		.EntityManagerFactory(EntityManagerFactory) // 해당 엔티티매니저 생성 팩토리
		.parameterValue(Map<String, Object> parameters) // 쿼리 파라미터 설정
		.build();
@Bean
public JpaPagingItemReader<ProjectWriteDomain> itemReader() {
    Map<String, Object> parameterValues = new HashMap<>();
    parameterValues.put("status", "ACTIVE");

    return new JpaPagingItemReaderBuilder<ProjectWriteDomain>()
        .name("pagingItemReader")
        .entityManagerFactory(entityManagerFactory)
        .pageSize(10)
        .queryString("SELECT p FROM ProjectWriteDomain p WHERE p.status = :status")
        .parameterValues(parameterValues)
        .build();
}

ItemWriter

  • JdbcBatchItemWriter
    • 개념 :데이터소스 지정, sql 쿼리 설정
    • Jdbc의 배치 기능 사용하여 bulk insert/update/delete 방식으로 처리(청크 단위)
      • 단건 처리가 아닌 일괄 처리를 하기 때문에 성능상 이점 존재
    • API
public JdbcBatchItemWriter itemWriter(){
	return new JdbcBatchItemWriterBuilder<T>()
		.name(String name)
		.datasource(Datasource) 
		.sql(String sql)
		.assertUpdates(boolean) // 트랜잭션 이후 최소 한개 이상 행을 업데이트,삭제하지 않을 경우 예외 발생여부, 디폴트는 true
		--택1--
		.beanMapped() // Pojo 기반으로 insert SQL의 Values를 매핑
		.columnMapped() // Key,Value 기반으로 insert SQL의 Values를 매핑
		------
		.build();
		

  • JpaItemWriter
    • 개념 : JPA 엔티티 기반으로 데이터 처리, 엔티티매니저팩토리 주입 받아 사용
      • 엔티티를 하나씩 Chunk 크기만큼 모았다가 insert 혹은 merge한 다음 flush 한다.
      • 아이템리더나 아이템프로세서로부터 아이템 전달 받을 때는 Entity 클래스타입으로 받아야 한다.
    • API
    @Bean
    public JpaItemWriter itemWriter() {
    	return new JpaItemWriterBuilder<T>()
    		.usePersist(boolean) // 엔티티를 persist()할 것인지 여부, false면 merge() 처리
    		.entityManagerFactory(EntityManagerFactory) 
    		.build();
    

<참고>
ItemReader로 100개를 받았고 해당 아이템들의 필드값을 a로 바꾸고 싶을 때,

JpaItemWriter에서 엔티티 상태변경을 통해 수정하면 쿼리는 100번 나간다.

만약 jdbcBatchItemWriter의 벌크 업데이트 쿼리였다면 1번만 나간다.

 

 


 

스프링 배치 예외 처리

 

 

FaultTolerant

  • 스프링 배치는 오류가 발생해도 Step이 즉시 종료되지 않고 Retry 혹은 Skip 기능을 통해 내결함성 서비스가 가능하다.
  • Skip : ItemReader/ ItemProcessor/ ItemWriter에 적용 가능
  • Retry: ItemProcessor/ ItemWriter에 적용 가능!
@Bean 
public Step batchStep() {
	.<I,O>chunk(10)
	.reader(ItemReader)
	.writer(ItemWriter)
	------------------------ 관련 설정
	.falutTorlerant()
	.skip(Class<? extends Throwable> type) // Skip할 예외 타입 설정
	.skipLimit(int skipLimit) // Skip 제한 횟수 설정(리더,프로세서,라이터 횟수 합)
	.skipPolicy(SkipPolicy skipPolicy) // 스킵을 어떤 조건, 기준으로 할건지 설정
	.noSkip(Class<? extends Throwable> type) // 예외 발생 시, 스킵하지 않을 예외 타입 설정
	.retry(Class<? extends Throwable> type) // 예외 발생 시 Retry할 예외 타입 설정
	.retryLimit(int retryLimit) // Retry 제한 횟수 설정
	.retryPolicy(RetryPolicy retryPolicy) // 리트라이 어떤 조건, 기준으로 할건지 설정
	.backOffPolicy(BackOffPolicy backOffPolicy) // 다시 리트라이 하기까지의 지연시간(단위:ms) 설정
	.noRetry(Class<? extends Throwable> type) // 예외 발생 시 리트라이 하지 않을 예외 타입 설정
	.noRollBack(Class<? extends Throwable> type) // 예외 발생 시 롤백하지 않을 예외 타입 설정
	.build();
	------------------------
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(2000); // 지정한 시간만큼 대기 후 재시도 한다.

SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(2); // 최대 2번 재시도 한다.

 

Skip

  • 데이터 처리하는 동안 설정된 Exception이 발생했을 경우, 해당 데이터 처리를 건너 뛰는 기능
  • 사소한 오류 같은 건 실패 처리 대신 Skip해서 배치 수행의 빈번한 실패 줄일 수 있다.

Retry

  • 아이템프로세서, 아이템라이터에 설정된 Exception이 발생했을 경우, 지정 정책에 따라 데이터 처리 재시도!
  • 스킵과 마찬가지로 Retry 함으로써 배치수행의 빈번한 실패를 줄일 수 있다.

 

 


 

스프링 배치 멀티 스레드 프로세싱

  • 개념 : 프로세스 내 특정 작업 처리하는 스레드가 하나일 경우 단일 스레드, 여러개일 경우 멀티 스레드
    • 일반적으로 복잡한 처리 혹은 대용량 데이터 작업 시에는 멀티 스레드 방식 선택
      -> 하지만 데이터 동기화 이슈가 존재하기 때문에 신중하게 선택해야 한다.

스프링 배치 스레드 모델

  • 스프링 배치는 기본적으로 단일 스레드 방식으로 작업을 처리하지만, 성능 향상과 대규모 데이터 작업을 위한 비동기 처리 및 Scale out 기능을 제공한다.
  1. AsyncItemProcessor / AsyncItemWriter
    • ItemProcessor에게 별도의 스레드가 할당되어 작업을 처리하는 방식
  2. Multi-threaded Step
    • Step 내 Chunk 구조인 ItemReader, ItemProcessor, ItemWriter마다 여러 스레드가 할당되어 실행하는 방법
  3. Remote Chunking
    • 분산 환경처럼 Step 처리가 여러 프로세스로 분할되어 외부의 다른 서버로 전송되어 처리
  4. Parallel Steps
    • Step마다 스레드가 할당되어 여러개의 Step을 병렬로 실행
  5. Partitioning
    • Master/Slave 방식으로서 Master가 데이터를 파티셔닝한 다음 각 파티션에게 스레드를 할당하여 Slave가 독립적으로 작동하는 방식

 

AsyncItemProcessor / AsyncItemWriter

 

기본 개념

  • Step 안에서 ItemProcessor가 비동기적으로 동작하는 구조
  • AsyncItemProcessor와 AsyncItemWriter가 함께 구성돼야 함!
  • AsyncItemProcessor로부터 AsyncItemWriter가 받는 최종 결과값은 List<Future<T>> 타입이며 비동기 실행이 완료할 때까지 대기한다
  • spring-batch-integration 의존성 필요!!
implementation 'org.springframework.batch:spring-batch-integration'
@Bean @JobScope 
public Step evaluationMailingStep(@Value("#{jobParameters[customer]}") String customer) throws Exception { 
return new StepBuilder(StepName.MAILING_STEP+"_"+customer, jobRepository) 
.<Assessment, Future<Assessment>>chunk(CHUNK_SIZE, platformTransactionManager) 
.reader(evaluationMailingReader()) 
// 비동기 실행을 위한 AsyncItemProcessor 설정 
// 스레드 풀 개수만큼 스레드가 생성되어 비동기로 실행된다. 
// 내부적으로 실제 ItemProcessor에게 실행을 위임하고 결과를 Future에 저장한다. 
.processor(asyncItemProcessor()) 
// 비동기 실행 결과 값들을 모두 받아오기까지 대기 
// 내부적으로 실제 ItemWriter에게 최종 결과값을 넘겨주고 실행을 위임한다. 
.writer(asyncItemWriter()) .build(); 
	} 
@Bean @StepScope 
public AsyncItemWriter<Assessment> asyncItemWriter() { 
AsyncItemWriter<Assessment> asyncItemWriter = new AsyncItemWriter<>(); 
asyncItemWriter.setDelegate(evaluationMailingWriter()); return asyncItemWriter; 
}
  • 예를 들어 리더에서 200개의 아이템을 읽어오고, 프로세서에서 5개의 스레드가 이를 나눠가지고 비동기적으로 처리
    → 각각의 처리 결과는 Future에 저장
  • 모든 처리가 완료되면 이 결과들이 라이터에 넘겨져서 최종적으로 처리
  • 이런 느낌으로 병렬, 비동기 처리를 통해 속도를 향상시킬 수 있다.

Multi-threaded Step

개념 : Step 내에서 멀티 스레드로 Chunk 기반 처리가 이루어지는 구조

  • ItemReader는 Thread-safe한지 반드시 확인해야함 ex)페이징아이템리더
    • why? 데이터를 읽어오는 역할이기 때문에 스레드마다 중복 데이터 읽어오지 않도록 동기화 보장돼야함!
  • 스레드마다 새로운 chunk가 할당되어 데이터 동기화가 보장됨!
    → 즉 스레드끼리 Chunk를 서로 공유하지 않음
@Bean
public Step step() throws Exception{

    return new StepBuilder("step", jobRepository)
		.<Customer,Customer>chunk(100)
		// thread-safe한 아이템리더 설정!
		.reader(pagingItemReader())
		.processor(customerItemProcessor())
		.writer(customerItemWrier())
		// 스레드 생성 및 실행을 위한 taskExecutor 설정
		.taskExecutor(taskExecutor())
		.build();

 

 

Parallel Steps

 

개념 : 여러 개의 Flow들을 병렬적으로 실행하는 구조

  • 실행이 다 완료된 후 FlowExecutionStatus 결과들을 취합해서 다음 단계 결정을 한다.
@Bean
public Job job()
    return new JobBuilder("job", jobRepository)
		.start(flow1())
		.split(TaskExecutor).add(flow2(), flow3())
		.next(flow4())
		.end()
		.build();

:


1. flow1 생성

2. flow2, flow3 생성 후 총 3개의 flow 합침
3. taskExecutor 에서 flow 개수만큼 스레드 생성해서 각 flow 실행
4. flow4는 split처리가 완료 된 후 실행이된다.

  • 스플릿 처리는 Spring Batch의 기능 중 하나로, 병렬로 실행되어야 하는 단계들을 병렬로 실행하고 결과를 통합하는 역할!

 

 

Partitioning

 

개념 : MasterStep이 SlaveStep을 실행시키는 구조

  • SlaveStep은 각 스레드에 의해 독립적으로 실행이 됨
  • SlaveStep은 독립적인 StepExecution 파라미터 환경을 구성함
  • SlaveStep은 ItemReader / ItemProcessor / ItemWriter 등을 가지고 동작하며 작업을 독립적으로 병렬 처리
  • MasterStep은 Partition Step이며 SlaveStep은 TaskletStep, FlowStep등이 올 수 있음!
public Step step() throws Exception {
    return new StepBuilder("masterStep", jobRepository)
    // PartitionStep 생성을 위한 PartitionStepBuilder가 생성되고 Partitioner를 설정
		.partitioner("slaveStep", new ColumnRangePartitioner())
		// 슬레이브 역할을 하는 스텝 설정 : TaskletStep, FlowStep 등
		.step(slaveStep())
		// 파티션 구분을 위한 값 설정 : 몇개의 파티션으로 나눌 것인지
		.gridSize(4)
		// 스레드 풀 실행자 설정(스레드 생성 및 스레드 풀 관리)
		.taskExecutor(ThreadPoolTaskExecutor())
		.build();
		
	

 

 

 


 

 

SynchronizedItemStreamReader

 

개념

  • Thread-safe 하지 않은 ItemReader를 Thread-safe하게 처리하도록 하는 역할을 한다.
  • 스프링배치 4.0부터 지원!
@Bean
public JdbcCursorItemReader<ProjectWriteDomain> itemReader(){
    JdbcCursorItemReader jdbcCursorItemReader
    = new JdbcCursorItemReaderBuilder<ProjectWriteDomain>()
        .name("cursorItemReader")
        .fetchSize(100) // 한 번에 가져올 데이터의 크기 설정 (청크 사이즈)
        .dataSource(dataSource) // 데이터베이스 접속에 사용할 DataSource 설정
        .rowMapper(new BeanPropertyRowMapper<>(ProjectWriteDomain.class)) // ResultSet의 각 행을 Domain 객체와 매핑하기 위한 RowMapper 설정
        .query("SELECT * FROM projects WHERE status = ?") // 실행할 쿼리 설정
        .queryArguments("ACTIVE") // 쿼리 파라미터 설정
        .maxItemCount(1000) // 최대로 읽어올 아이템의 개수 설정
        .maxRows(1000) // ResultSet이 최대로 포함할 수 있는 행의 개수 설정
        .build();

	return new SynchronizedItemStreamReaderBuilder<Customer>()
		.delegate(jdbcCursorItemReader)
		.build();
}
  • SynchronizedItemStreamReader는 스레드 안전을 위한 동기화 처리를 해주고
    데이터 처리는 JdbcCursorItemReader에게 위임한다.

 

 


 

스프링 배치 이벤트 리스너

  • 리스너는 배치 흐름 중에 Job, Step, Chunk 단계의 실행 전후에 발생하는 이벤트를 받아 용도에 맞게 활용할 수 있도록 제공하는 인터셉터 개념의 클래스
  • 각 단계별로 로그 기록 남기거나 소요 시간 계산 혹은 실행 상태 정보들을 참조 및 조회 가능
  • 이벤트를 받기 위해서는 리스너를 등록해야 하며, 각 단계 별로 지정 가능

Listeners

  • Job
    • JobExecutionListener - Job 실행 전후
  • Step
    • StepExecutionListner - Step 실행 전후
    • ChunkListener Chunk - 실행 전후 (Tasklet 실행 전후), 오류 시점
    • ItemReadListener - ItemReader 실행 전후, 오류 시점, item이 null일 경우 호출 안됨
    • ItemProcessListener - ItemProcessor 실행 전후, 오류 시점, item이 null일 경우 호출 안됨
    • ItemWriteListener - ItemWriter 실행 전후, 오류 시점, item이 null일 경우 호출 안됨
    • SkipListener - 읽기, 쓰기, 처리 Skip 실행 시점, Item 처리가 Skip 될 경우 Skip 된 item 추적
    • RetryListener - Retry 시작, 종료, 에러 시점

JobExecutionListener 예시

@Configuration
public class StopWatchJobListener implements JobExecutionListener {

    @Override
    public void afterJob(JobExecution jobExecution) {
        LocalDateTime startTime = jobExecution.getStartTime();
        LocalDateTime endTime = jobExecution.getEndTime();

        if (startTime != null && endTime != null) {
            Duration duration = Duration.between(startTime, endTime);
            long seconds = duration.getSeconds();

            System.out.println("================================================");
            System.out.println("총 소요시간: " + seconds + "초");
            System.out.println("================================================");
        } else {
            System.out.println("Job이 실행되지 않았거나 종료 시간이 기록되지 않았습니다.");
        }
    }

    @Bean
    public Job evaluationMailingJob(Step evaluationMailingStep
    ){
        return new JobBuilder("testJob", jobRepository)
            .listener(new StopWatchJobListener())
            .start(step1)
            .build();
    }

 


 

<참고>

스프링배치 5 버전 이후(부트 3 이후)에서는 밑에 방식으로 바뀜

    @Bean
    public Job myJob(Step step) {
        return this.jobBuilderFactory.get("myJob")
                .start(step)
                .build();
    }
    
    
	@Bean
    public Job myJob(JobRepository jobRepository, Step step) {
        return new JobBuilder("myJob", jobRepository)
                .start(step)
                .build();
    }

밑에 방식처럼 바뀜 why? 
Builder에서는 JobRepository가 생성되고 설정된다는 사실을 숨기고 있기 때문

 

 

Comments