Spring Batch

Spring Batch

생성일
Mar 3, 2023 06:21 AM
최종 편집 일시
Last updated October 16, 2024
태그
JAVA
SPRING

1. 배치란?

  • 큰 단위의 작업을 일괄 처리
  • 대부분 처리량이 많고 비 실시간성 처리에 사용
    • 대용량 데이터 계산, 정산, 통계, 데이터베이스, 변환 등
  • 컴퓨터 자원을 최대로 활용
    • 컴퓨터 자원 사용이 낮은 시간대에 배치를 처리하거나 배치만 처리하기 위해 사용자가 사용하지 않는 또 다른 컴퓨터 자원을 사용
  • 사용자 상호작용으로 실행되기 보단 스케쥴러와 같은 시스템에 의해 실행되는 대상
    • 예를 들어 오전 10시에 배치 실행, 매주 월요일 12시마다 실행
    • crontab, jenkins… etc

1-1. 스프링 배치란?

  • 배치 처리를 하기 위한 Spring Framework 기반 기술
    • Spring에서 지원하는 기술 적용 가능
    • DI, AOP, 서비스 추상화
  • 스프링 배치의 실행 단위인 Job과 Step
  • 비교적 간단한 작업(Tasklet)단위 처리와, 대량 묶음(Chunk)단위 처리
 
package fastcampus.spring.batch.springbatchexample; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @EnableBatchProcessing public class SpringBatchExampleApplication { public static void main(String[] args) { SpringApplication.run(SpringBatchExampleApplication.class, args); } }
@EnableBatchProcessing 어노테이션을 통해 배치 애플리케이션으로 설정
 
package fastcampus.spring.batch.springbatchexample.part1; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @RequiredArgsConstructor @Slf4j public class HelloConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; @Bean public Job helloJob() { return jobBuilderFactory.get("helloJob") .incrementer(new RunIdIncrementer()) .start(this.helloStep()) .build(); } @Bean public Step helloStep() { return stepBuilderFactory.get("helloStep") .tasklet((contribution, chunkContext) -> { log.info("hello spring batch"); return RepeatStatus.FINISHED; }).build(); } }
configuration 클래스를 위와 같이 생성.
배치의 기본 단위는 Job이고 Job 내부에 step을 두어 동작을 관리.
step 안에서는 tasklet을 통해 내부 동작을 작성.
 
spring: batch: job: names: ${job.name:NONE}
application.yml에 위와 같은 설정을 추가하면 실행 인수에 —job.name=Job이름 을 넣어서 원하는 job만 실행시킬 수 있다. 위처럼 NONE을 설정해 두면 의도치 않게 애플리케이션 실행 시 전체 배치 잡이 도는 것을 방지할 수 있다.
 
notion image

스프링 배치 기본 구조 - Job

JobLauncer가 Job을 실행시켜줌.
Job은 배치의 실행 단위.
Job은 N개의 Step을 실행할 수 있으며, 흐름(Flow)을 관리할 수 있다.
  • 예를 들어, A 스텝 실행 후 조건에 따라 B 스텝 또는 C 스텝을 실행하도록 관리 가능.
 
스텝은 잡의 세부 실행 단위이며, N개가 등록 돼 실행된다.
스텝의 실행 단위는 크게 2가지로 나눌 수 있다.
  1. Chunk 기반: 하나의 큰 덩어리를 n개씩 나눠서 실행
  1. Task 기반: 하나의 작업 기반으로 실행
Chunk 기반 스텝은 ItemReader, ItemProcessor, ItemWriter가 있다.
  • Item은 배치 처리 대상 객체를 의미
ItemReader는 배치 처리 대상 객체를 읽어 ItemProcessor 또는 ItemWriter에게 전달한다.
  • 예를 들어 파일 또는 DB에서 데이터를 읽는다.
ItemProcessor는 input 객체를 output 객체로 filtering 또는 processing 해 ItemWriter에게 전달한다.
  • 예를 들어 ItemReader에서 읽은 데이터를 수정 또는 ItemWriter 대상인지 filtering 한다.
  • ItemProcessor는 optional 하다.
  • ItemProcessor가 하는 일을 ItemReader 또는 ItemWriter가 대신할 수 있다.
ItemWriter는 배치 처리 대상 객체를 처리한다.
  • 예를 들어 DB updtae를 하거나, 처리 대상 사용자에게 알림을 보낸다.
 

스프링 배치 테이블 구조와 이해

notion image
배치 실행을 위한 메타 데이터가 저장되는 테이블
BATCH_JOB_INSTANCE
  • job이 샐행되며 생성되는 최상위 계층의 테이블
  • job_name과 job_key를 기준으로 하나의 row가 생성되며, 같은 job_name과 job_key가 저장될 수 없다.
  • job_key는 BAYCH_JOB_EXECUTION_PARAMS에 저장되는 Parameter를 나열해 암호화해 저장한다.
BATCH_JOB_EXECUTION
  • Job이 실행되는 동안 시작/종료 시간, job 상태 등을 관리
BATCH_JOB_EXECUTION_PARAMS
  • Job을 실행하기 위해 주입된 parameter 정보 저장
BATCH_JOB_EXECUTION_CONTEXT
  • Job이 실행되며 공유해야할 데이터를 직렬화 해 저장
BATCH_STEP_EXECUTION
  • Step이 실행되는 동안 필요한 데이터 또는 실행된 결과 저장
BATCH_STEP_EXECUTION_CONTEXT
  • Step이 실행되며 공유해야할 데이터를 직렬화 해 저장
 
** 이 메타 테이블은 spring-batch-core 아래에 있다. 거기서 db에 맞는 sql을 가지고 와서 테이블 생성.
yml에 initialize-schema 옵션을 통해 실행 스크립트의 옵션을 설정할 수 있다.
always, embedded, never가 있는데 앞의 2개는 개발환경, never는 운영환경에 보통 쓴다고 한다. (운영에서는 실행 스크립트를 수동으로 관리하겠다는 뜻)
 

배치 클래스 이해

  • JobInstance: BATCH_JOB_INSTANCE 테이블과 매핑
  • JobExecution: BATCH_JOB_EXECUTION 테이블과 매핑
  • JobParameters: BATCH_JOB_EXECUTION_PARAMS 테이블과 매핑
  • ExecutionContext: BATCH_JOB_EXECUTION_CONTEXT 테이블과 매핑
 
  • JobInstance의 생성 기준은 JobParameters 중복 여부에 따라 생성
  • 다른 parameter로 Job이 실행되면, JobInstance가 생성
  • 같은 parameter로 Job이 실행되면, 이미 생성된 JobInstance가 실행
  • JobExecution은 항상 새롭게 생성
    • 예를 들어
    • 처음 Job 실행 시 date parameter가 1월1일로 실행 됐다면, 1번 잡 인스턴스 생성
    • 다음 잡 실행 시 date parameter가 1월 2일로 실행 됐다면, 2번 잡 인스턴스 생성
    • 다음 잡 실행 시 date parameter가 1월 2일로 실행 됐다면, 2번 잡 인스턴스 재실행
      • 이 때 잡이 재실행 대상이 아닌 경우 에러 발생
  • 잡을 항상 새로운 잡 인스턴스가 실행 될 수 있도록 RunIdIncrementer 제공
    • RunIdincrementer는 항상 다른 run.id를 Parameter로 설정
 
local mysql로 batch 실행해보기 위해 application-mysql.yml 생성
spring: datasource: hikari: jdbc-url: jdbc:mysql://localhost:3306/spring_batch?characterEncoding=UTF-8&serverTimezone=Asia/Seoul driver-class-name: com.mysql.cj.jdbc.Driver username: smwef password: 1234 batch: jdbc: initialize-schema: never
 
  • StepExecution: BATCH_STEP_EXECUTION 테이블과 매핑
  • ExecutionContext: BATCH_STEP_EXECUTION_CONTEXT 테이블과 매핑
 

예제 만들어 보기

  • Job 내에서 공유할 수 있는 BATCH_JOB_EXECUTION_CONTEXT
  • 하나의 Step 안에서 공유할 수 있는 BATCH_STEP_EXECUTION_CONTEXT
package fastcampus.spring.batch.springbatchexample.part2; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.*; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @RequiredArgsConstructor @Slf4j public class SharedConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; @Bean public Job sharedJob() { return jobBuilderFactory.get("sharedJob") .incrementer(new RunIdIncrementer()) .start(this.sharedStep()) .next(this.sharedStep2()) .build(); } @Bean public Step sharedStep() { return stepBuilderFactory.get("sharedStep") .tasklet((contribution, chunkContext) -> { StepExecution stepExecution = contribution.getStepExecution(); ExecutionContext stepExecutionContext = stepExecution.getExecutionContext(); stepExecutionContext.putString("stepKey", "step execution context"); JobExecution jobExecution = stepExecution.getJobExecution(); JobInstance jobInstance = jobExecution.getJobInstance(); ExecutionContext jobExecutionContext = jobExecution.getExecutionContext(); jobExecutionContext.putString("jobKey", "job execution context"); JobParameters jobParameters = jobExecution.getJobParameters(); log.info("jobName : {}, stepName : {}, parameter: {}", jobInstance.getJobName(), stepExecution.getStepName(), jobParameters.getLong("run.id")); return RepeatStatus.FINISHED; }) .build(); } @Bean public Step sharedStep2() { return stepBuilderFactory.get("sharedStep2") .tasklet((contribution, chunkContext) -> { StepExecution stepExecution = contribution.getStepExecution(); ExecutionContext stepExecutionContext = stepExecution.getExecutionContext(); JobExecution jobExecution = stepExecution.getJobExecution(); ExecutionContext jobExecutionContext = jobExecution.getExecutionContext(); log.info("jobKey : {}, stepKey : {} ", jobExecutionContext.getString("jobKey", "emptyJobKey"), stepExecutionContext.getString("stepKey", "emptyStepKey")); return RepeatStatus.FINISHED; }) .build(); } }
2023-03-03 18:29:25.251 INFO 87093 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=sharedJob]] launched with the following parameters: [{run.id=1}] 2023-03-03 18:29:25.295 INFO 87093 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [sharedStep] 2023-03-03 18:29:25.325 INFO 87093 --- [ main] f.s.b.s.part2.SharedConfiguration : jobName : sharedJob, stepName : sharedStep, parameter: 1 2023-03-03 18:29:25.335 INFO 87093 --- [ main] o.s.batch.core.step.AbstractStep : Step: [sharedStep] executed in 39ms 2023-03-03 18:29:25.354 INFO 87093 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [sharedStep2] 2023-03-03 18:29:25.379 INFO 87093 --- [ main] f.s.b.s.part2.SharedConfiguration : jobKey : job execution context, stepKey : emptyStepKey 2023-03-03 18:29:25.385 INFO 87093 --- [ main] o.s.batch.core.step.AbstractStep : Step: [sharedStep2] executed in 31ms 2023-03-03 18:29:25.398 INFO 87093 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=sharedJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 125ms
하나의 Job 안에서는 jobExecutionContext를 공유하기 때문에 sharedStep2에서 jobKey를 찍어봤을 때 sharedStep에서 설정한 jobKey인 “job execution context”가 찍힌 모습.
stepExecutionContext는 step 내부에서만 공유가되기 때문에 sharedStep1에서 설정한 stepKey는 sharedStep2와는 무관하다는 것을 알 수 있음.
 

Task 기반 배치와 Chunk 기반 배치

  • 배치를 처리하는 2가지 방법
  • Tasklet을 이용한 Task 기반 처리
    • 배치 처리 과정이 비교적 쉬운 경우 쉽게 사용
    • 대량 처리를 하는 경우 더 복잡
    • 하나의 큰 덩어리를 여러 덩어리로 나누어 처리하기 부적합
  • Chunk를 사용한 chunk(덩어리) 기반 처리
    • ItemReader, ItemProcessor, ItemWriter의 관계 이해 필요
    • 대량 처리를 하는 경우 Tasklet보다 비교적 쉽게 구현
    • 예를 들면 10000개의 데이터 중 1,000개씩 10개의 덩어리로 수행
      • 이를 Tasklet으로 처리하면 1만개를 한번에 처리하거나, 수동으로 1000개씩 분할
 
tasklet으로 100개짜리 처리하기.
package fastcampus.spring.batch.springbatchexample.part3; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.ArrayList; import java.util.List; @Configuration @RequiredArgsConstructor @Slf4j public class ChunkProcessingConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; @Bean public Job chunkProcessingJob() { return jobBuilderFactory.get("chunkProcessingJob") .incrementer(new RunIdIncrementer()) .start(this.taskBaseStep()) .build(); } @Bean public Step taskBaseStep() { return stepBuilderFactory.get("taskBaseStep") .tasklet(this.tasklet()) .build(); } private Tasklet tasklet() { return ((contribution, chunkContext) -> { List<String> items = getItems(); log.info("ttask item size: {}", items.size()); return RepeatStatus.FINISHED; }); } private List<String> getItems() { List<String> items = new ArrayList<>(); for (int i = 0; i < 100; i++) { items.add(i + " Hello"); } return items; } }
 
notion image
  • reader에서 null을 반환할때까지 Step은 반복됨.
  • <INPUT, OUTPUT>chunk(int)
    • reader에서 INPUT을 return
    • processor에서 INPUT을 받아 processing 후 OUTPUT을 return
      • INPUT, OUTPUT은 같은 타입일 수 있음.