Page tree
Skip to end of metadata
Go to start of metadata




청크(chunk) 기반으로 동작하는 각 스텝(step)은 ItemReader, ItemProcessor, ItemWriter 로 구성된다.

스프링 배치는 거의 모든 유형의 입력 데이터를 처리할 수 있는 표준 방법을 제공하며, 커스텀 리더를 개발하는 기능도 제공한다.

ItemReader 인터페이스

org.springframework.batch.item.ItemReader<T> 인터페이스는 스텝에 입력을 제공할 때 사용하는 read 라는 단일 메서드를 정의한다.

package org.springframework.batch.item;
public interface ItemReader {

	T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

ItemReader 인터페이스는 전략 인터페이스 (Strategy Interface) 이며, 스프링 배치는 처리할 입력 유형에 맞는 여러 구현체를 제공한다.

파일 입력

개발자는 스프링이 제공하는 리더에 읽으려는 데이터의 포맷을 선언하기만 하면 리더가 나머지 작업을 알아서 처리한다.

플랫 파일

FlatFileItemReader

두 개의 메인 컴포넌트로 이루어 진다 : Resource, LineMapper

Resource : 읽어들일 대상 파일

LineMapper : 스프링JDBC 에서 RowMapper 가 담당하는 것과 비슷한 역할. 파일의 레코드를 객체로 변환한다.

  • LineTokenizer : 레코드를 FieldSet 으로 변환
  • FieldSetMapper : FieldSet을 사용하려는 도메인 객체로 매핑

 


XML

StaxEventItemReader

XML 문서 내 각 섹션을 독립적으로 파싱하는 StAX 파서를 사용한다. 따라서 아이템 기반 읽기가 가능하다.

스프링 배치는 사용자가 지정한 XML 프래그먼트를 도메인 객체로 파싱한다.

*XML 프래그먼트 : 시작 태그부터 종료 태그까지의 XML 블록


JSON

JsonItemReader

JSON 청크를 읽어서 객체로 파싱한다. 실제 파싱작업은 JsonObjectReader 인터페이스의 구현체에게 위임된다.

*JsonObjectReader 구현체는 두 가지 제공된다 : Jackson, Gson


데이터베이스 입력

JDBC

스프링 프레임워크에서는 JDBC 같은 고충이 따르는 부분을 캡슐화 하여 제공해준다.

스프링이 제공하는 JdbcTemplate 을 사용하면 전체 ResultSet 에서 한 row 씩 순서대로 가져오면서 모든 로우를 도메인 객체로 변환해 메모리에 적재한다.

스프링 배치는 이에 대한 대안으로 한번에 처리할 만큼의 레코드만 로딩하는 별도의 두 가지 기법을 제공하는데, 바로 커서(cursor)페이징(paging) 이다.

커서 : 표준 ResultSet 으로 구현된다. ResultSet 이 open 되면 next() 메서드를 호출할 때마다 데이터베이스에서 배치 레코드를 가져와 반환한다.

페이징 : 데이터베이스에서 페이지라고 부르는 청크 크기만큼의 레코드를 가져오는 것이다. 각 페이지는 해당 페이지만큼의 레코드만 가져올 수 있는 고유한 SQL 쿼리를 통해 생성된다.


커서/페이지 기반 JDBC 리더를 구현하려면 다음 두 가지 작업이 필요하다.

  1. 필요한 쿼리를 실행할 수 있는 리더 (ex. JdbcCursorItemReader, JdbcPagingItemReader)
  2. RowMapper 구현체


커서 기반 처리

JdbcCursorItemReader

  • 특정 상황에서는 레코드를 스트리밍하는 것이 괜찮은 방법이나, 백만 단위의 레코드를 처리할 때라면 매번 요청을 할 때마다 네트워크 오버헤드가 추가된다.
  • ResultSet 은 스레드 안전이 보장되지 않으므로 다중 스레드 환경에서는 사용할 수 없다. 

→ 이러한 단점 때문에 페이징을 선택하게 된다.


페이징 기반 처리

JdbcPagingItemReader

  • 레코드 처리 자체는 커서 기반 처리와 같으나, DB 에서 가져오는 방법이 다르다. 페이징 기법은 한 번에 SQL 쿼리 하나를 실행해 레코드 하나를 가져오는 대신, 각 페이지마다 새로운 쿼리를 실행한 뒤 쿼리 결과를 한번에 메모리에 적재한다.

JdbcPagingItemReader 를 구성하기 위한 네 가지 의존성

  1. 데이터 소스
  2. PagingQueryProvider 구현체
  3. RowMapper 구현체
  4. 페이지 크기



하이버네이트

스프링 배치가 제공하는 하이버네이트 기반 ItemReader 는 커밋할 때 세션을 플러시하며 하이버네이트를 웹 기반으로 처리할 때보다는 좀 더 배치 처리에 관계가 있는 추가 기능을 제공한다.

다른 시스템에서 사용하는 용도로 이미 하이버네이트 객체 매핑을 사용하고 있는 환경에서는, 스프링 배치의 하이버네이트 기반 ItemReader를 사용하면 필요한 배치를 효율적으로 시작하고 실행할 수 있다.


하이버네이트로 커서 처리하기

  1. sessionFactory
  2. 엔티티 매핑
  3. HibernateCursorItemReader 구성
  4. 하이버네이트 의존성 추가


하이버네이트로 페이징 처리하기

  1. sessionFactory
  2. 엔티티 매핑
  3. HibernatePagingItemReader 구성
  4. 페이지 크기 설정
  5. 하이버네이트 의존성 추가

* 참고 : 스프링 배치 하이버네이트 관련 이슈

https://github.com/spring-projects/spring-batch/pull/713


JPA

ORM 영역에서 표준화된 접근법을 제공한다. 그러나 JPA 는 커서 기반 조회방법은 제공하지 않는다.

JPA 를 사용할 때 신경써야 할 유일한 부분은 ItemReader 를 구성하는 것이다. (JpaPagingItemReader)

JpaPagingItemReader 사용할 때 필요한 의존성

  1. ExecutionContext 내 엔트리의 접두어로 사용되는 이름
  2. 스프링 부트가 제공하는 entityManager
  3. 실행할 쿼리
  4. 파라미터



저장 프로시저

StoredProcedureItemReader

대다수 엔터프라이즈 환경에서 관계형 데이터베이스는 온갖 비즈니스 목적에 맞춰 사용되는 복잡한 저장 프로시저가 포함된 코드의 생태계다.

스프링 데이터

스프링 데이터의 목적은 "기본적인 데이터 저장소의 특징을 유지하면서도, 친숙하고 일관된 스프링 기반의 데이터 접근 프로그래밍 모델을 제공하는 것" 이다.

스프링 데이터 프로젝트들은 사용자가 개별 NoSQL과 SQL데이터 저장소의 고유 기능에 접근할 수 있게 하면서도, 일관된 추상화의 집합(Repository 인터페이스)를 제공한다.

몽고DB

MongoItemReader

페이지 기반 ItemReader 로, 쿼리를 전달받으며 몽고DB 서버에서 페이지 단위로 가져온 데이터를 반환한다. 다음 의존성이 필요하다.

  1. MongoOperations 구현체
  2. name (saveState 가 true 일 때 일반적인 스테이트풀한 스프링 배치 컴포넌트처럼 실행 상태를 저장하는데 필요)
  3. targetType
  4. Json 기반 쿼리 또는 Query 인스턴스


스프링 데이터 리포지터리

RepositoryItemReader

스프링 데이터는 스프링 데이터가 제공하는 특정 인터페이스(ex.PagingAndSortingRepository) 중 하나를 상속하는 인터페이스를 사용자가 정의하기만 하면 스프링 데이터가 해당 인터페이스의 구현을 처리하는 기능을 제공한다.



기존 서비스

ItemReaderAdapter

기존 스프링 서비스를 호출해서 ItemReader 에 데이터를 공급할 수 있다.

ItemReaderAdapter 는 다른 엘리먼트를 래핑해서 스프링 배치가 해당 엘리먼트와 통신할 수 있게 하는 데 사용된다.


커스텀 입력

ItemReader 인터페이스를 구현하고 read() 메서드를 작성하면 된다.

만약 스프링 배치가 JobRepository 에 리더의 상태를 저장해서 이전에 종료된 지점부터 리더를 다시 시작할 수 있게 하려면 추가로 ItemStream 인터페이스를 구현해야 한다.


에러 처리

레코드 건너뛰기

  • 어떤 조건에서 레코드를 건너뛸 것인가 (어떤 예외를 무시할 것인가)
  • 얼마나 많은 레코드를 건너뛸 것인가

를 고려하여 설정한다.

@Bean
public Step copyFileStep() {
	return this.stepBuilderFactory.get("copyFileStep")
				.<Customer, Customer>chunk(10)
				.reader(customerItemReader())
				.writer(itemWriter())
				.faultTolerant()
				.skip(Exception.class) // 건너 뛸 예외
				.noSkip(ParseException.class) // 건너뛰지 말아야 할 예외
				.skipLimit(10) // 건너 뛸 횟수
}

위와 같은 방법 외에도 SkipPolicy 라는 인터페이스를 구현하여 설정할 수도 있다.


잘못된 레코드 로그 남기기

ItemListenerSupport 를 사용하고 onReadError 메서드를 오버라이드해서 발생한 에러를 기록한다.

또는 메서드에 @OnReadError 애너테이션을 추가한 POJO 를 사용한다.

입력이 없을 때의 처리

입력을 읽지 못했을 때 스텝을 실패로 처리하거나 이메일을 보내는 것 같은 다른 처리를 하려면 StepListener 를 사용한다. 





  • No labels