Spring Batch - MultiThread execution Step

In general, Spring Batch runs in a single thread. That means everything runs sequentially.
Spring Batch supports various ways to execute it in parallel. This time, we will look at how to execute a step with one of them, multi-thread.

Introduction

Spring Batch's multi-thread step uses Spring's TaskExecutorto execute each thread in chunk units .

TaskExecutorDepending on which is selected here, new threads may be continuously created for every chunk unit ( SimpleAsyncTaskExecutor) or executed while reusing only the specified number of threads in the thread pool. ( ThreadPoolTaskExecutor)



The first thing to do to configure a multi-threaded environment in Spring Batch is to check if the Reader and Writer you want to use support multi-threading.

You should always check each Reader and Writer's Javadoc for that thread-safe phrase.
If not, you must select a reader and writer that are thread-safe, and if you must use the reader, you can convert to thread-safe using SynchronizedItemStreamReader .

And another thing to note is that since each chunk is multi-threaded, it is impossible to restart from the point of failure , which is one of the great advantages of Spring Batch .

The reason is simple.
If, when a single-threaded run sequentially the 10th Chunk fails guaranteed haeteum of up to ninth Chunk success , but, in the case of multi-threaded by that of 1 to 10 Chunk beheld be run at the same time the 10th Chunk failed 1-9 There is no guarantee that all of the chunks above are successful .

So, in general, set and use the saveStateoption of ItemReader as false.

PagingItemReader Example

The first thing to notice is when using PagingItemReader. At this time, there is nothing to worry about. This is because PagingItemReader is thread safe .

If you need a batch to run as multithreaded, we recommend using it as a PagingItemReader.

The example code is written in JpaPagingItemReader.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class MultiThreadPagingConfiguration {
    public static final String JOB_NAME = "multiThreadPagingBatch";

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    private int chunkSize;

    @Value("${chunkSize:1000}")
    public void setChunkSize(int chunkSize) {
        this.chunkSize = chunkSize;
    }

    private int poolSize;

    @Value("${poolSize:10}") // (1)
    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    @Bean(name = JOB_NAME+"taskPool")
    public TaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // (2)
        executor.setCorePoolSize(poolSize);
        executor.setMaxPoolSize(poolSize);
        executor.setThreadNamePrefix("multi-thread-");
        executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
        executor.initialize();
        return executor;
    }

    @Bean(name = JOB_NAME)
    public Job job() {
        return jobBuilderFactory.get(JOB_NAME)
                .start(step())
                .preventRestart()
                .build();
    }

    @Bean(name = JOB_NAME +"_step")
    @JobScope
    public Step step() {
        return stepBuilderFactory.get(JOB_NAME +"_step")
                .<Product, ProductBackup>chunk(chunkSize)
                .reader(reader(null))
                .processor(processor())
                .writer(writer())
                .taskExecutor(executor()) // (2)
                .throttleLimit(poolSize) // (3)
                .build();
    }


    @Bean(name = JOB_NAME +"_reader")
    @StepScope
    public JpaPagingItemReader<Product> reader(@Value("#{jobParameters[createDate]}") String createDate) {

        Map<String, Object> params = new HashMap<>();
        params.put("createDate", LocalDate.parse(createDate, DateTimeFormatter.ofPattern("yyyy-MM-dd")));

        return new JpaPagingItemReaderBuilder<Product>()
                .name(JOB_NAME +"_reader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(chunkSize)
                .queryString("SELECT p FROM Product p WHERE p.createDate =:createDate")
                .parameterValues(params)
                .saveState(false) // (4)
                .build();
    }

    private ItemProcessor<Product, ProductBackup> processor() {
        return ProductBackup::new;
    }

    @Bean(name = JOB_NAME +"_writer")
    @StepScope
    public JpaItemWriter<ProductBackup> writer() {
        return new JpaItemWriterBuilder<ProductBackup>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }
}


(1) @Value("${poolSize:10}")


The number of threads in the thread pool to be created is used as an environment variable.
${poolSize:10}In poolSize10, the default value is to use 10 if there is no variable declared before .
The reason to adjust PoolSize when executing batch is to manage thread pool flexibly according to the execution environment .
In the development environment, it can be executed with 1 thread, and in operation, with 10 threads.
Or you may need to suddenly reduce the number of threads due to different batches running at the same time.
I can decide how many threads to spawn at the time of batch execution at any time, so I prefer to use it outside.
The reason to receive it as a setter rather than a field is that there is no way to input PoolSize, ChunkSize, etc. when writing test code without Spring Context.

(2) ThreadPoolTaskExecutor


This is a thread management method using a thread pool.
option-
corePoolSize: Basic size of pool
maxPoolSize: Maximum size of pool
In addition SimpleAsyncTaskExecutorto this, if you use it, a thread is created for every request .
At this time, it continues to generate, and if it exceeds the concurrency limit, there is a phenomenon that the request is blocked later, so it is not used well in the operating environment.
Please refer to the link for more details.

(3) throttleLimit(poolSize)


The default is 4 .
Decide how many of the created threads will be used for the actual work.
If you create 10 threads throttleLimitand leave 4 as 4, it means that only 4 out of 10 threads will be used in the batch.
Generally corePoolSize, maximumPoolSize, throttleLimitAlign both the same value.
(4) .saveState(false)

As explained earlier, this option is essential when using in a multi-threaded environment saveState = false.
Turning off this option ( false) prevents the Reader from saving the failed point, so that it will be read from the beginning unconditionally even on the next execution.
Turning this option on can cause a bigger problem.
If the 8th Chunk fails, the fact is that if the 4th Chunk also fails, the 8th is recorded and it can be executed from the 8th time the next rerun.
It falseis recommended to leave the option as so that if it fails, it can be run again from scratch .
Similar function is in the Job option, which .preventRestart()prevents the Job from being re-executed with the same parameters.
.saveState(false)Is an option that prevents the reader from recording the point of failure, but strictly speaking the two are different options.
You can look at it as much as it prevents re-executing the step .
Now, let's verify that this code actually works well with multithreading with test code.

Test code:
All test code uses JUnit5.
If you are new to writing test code in Spring Batch, please refer to the previous posting .

@ExtendWith(SpringExtension.class)
@SpringBatchTest
@SpringBootTest(classes={MultiThreadPagingConfiguration.class, TestBatchConfig.class})
@TestPropertySource(properties = {"chunkSize=1", "poolSize=2"}) // (1)
public class MultiThreadPagingConfigurationTest {

    @Autowired
    private ProductRepository productRepository;

    @Autowired
    private ProductBackupRepository productBackupRepository;

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @AfterEach
    void after() {
        productRepository.deleteAll();
        productBackupRepository.deleteAll();
    }

    @Test
    public void 페이징_분산처리_된다() throws Exception {
        //given
        LocalDate createDate = LocalDate.of(2020,4,13);
        ProductStatus status = ProductStatus.APPROVE;
        long price = 1000L;
        for (int i = 0; i < 10; i++) {
            productRepository.save(Product.builder()
                    .price(i * price)
                    .createDate(createDate)
                    .status(status)
                    .build());
        }

        JobParameters jobParameters = new JobParametersBuilder()
                .addString("createDate", createDate.toString())
                .addString("status", status.name())
                .toJobParameters();
        //when
        JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);

        //then
        assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        List<ProductBackup> backups = productBackupRepository.findAll();
        backups.sort(Comparator.comparingLong(ProductBackup::getPrice));

        assertThat(backups).hasSize(10);
        assertThat(backups.get(0).getPrice()).isEqualTo(0L);
        assertThat(backups.get(9).getPrice()).isEqualTo(9000L);
    }

}
(1) properties = {"chunkSize=1", "poolSize=2"}

Each option has the following meaning.
chunkSize=1: One data for one Chunk to process.
poolSize=2: The number of threads in the thread pool to be created is two.
In this case, when processing 10 data, 2 threads are processed 5 times each .
Of course, if one thread processes for a long time, the other one may process more cases.
How about running the above test code?
You can see that 2 threads read and write each page as shown below.

paging-test-1

What if it was a single-threaded model like before?
Then, after reading and writing are completed for one page, the next page will proceed.

paging-test-2

We showed JpaPagingItemReader as an example, but other PagingItemReader can be used in the same way.

jdbcpaging

(JdbcPagingItemReader)

Rather comfortable PagingItemReaders only need to specify a threadpool.
Now, let's see what to do with cursor-based readers that are not ThreadSafe.

3. CursorItemReader
CursorItemReader, which reads data using JDBC ResultSet, including JdbcCursorItemReader, is not thread safe.

javadoc2

(You can't find the word Thread Safe anywhere in Javadoc.)

In order to change Readers read()that are not Thread Safe as Thread Safe as above, it is necessary to apply data read synchronized.

However, if you do this, Reader will not work as multi-thread, but will read data sequentially.
Even if the Reader is synchronized, the Processor/Writer is multi-threaded .

In general, the batch process consumes more resources and time during the Write phase.
So there are a lot of things about Bulk Insert.

synchronizedHow can I add to a JdbcCursorItemReader or HibernateCursorItemReader that already has an implementation ?

JpaCursorItemReader will be added in Spring Batch 4.3 .

The easiest way is to wrap it with the SynchronizedItemStreamReader added since Spring Batch 4.0 .

Now, let's take a look at the example code to check if the CursorItemReader is not thread safe, and fix it.

3-1. Not Thread Safety code
First, when using JdbcCursorItemReader in a multi-threaded environment.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class MultiThreadCursorConfiguration {
    public static final String JOB_NAME = "multiThreadCursorBatch";

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;
    private final DataSource dataSource;

    private int chunkSize;

    @Value("${chunkSize:1000}")
    public void setChunkSize(int chunkSize) {
        this.chunkSize = chunkSize;
    }

    private int poolSize;

    @Value("${poolSize:10}")
    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    @Bean(name = JOB_NAME+"taskPool")
    public TaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(poolSize);
        executor.setMaxPoolSize(poolSize);
        executor.setThreadNamePrefix("multi-thread-");
        executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
        executor.initialize();
        return executor;
    }

    @Bean(name = JOB_NAME)
    public Job job() {
        return jobBuilderFactory.get(JOB_NAME)
                .start(step())
                .preventRestart()
                .build();
    }

    @Bean(name = JOB_NAME +"_step")
    @JobScope
    public Step step() {
        return stepBuilderFactory.get(JOB_NAME +"_step")
                .<Product, ProductBackup>chunk(chunkSize)
                .reader(reader(null))
                .listener(new CursorItemReaderListener()) // (1)
                .processor(processor())
                .writer(writer())
                .taskExecutor(executor())
                .throttleLimit(poolSize)
                .build();
    }

    @Bean(name = JOB_NAME +"_reader")
    @StepScope
    public JdbcCursorItemReader<Product> reader(@Value("#{jobParameters[createDate]}") String createDate) {
        String sql = "SELECT id, name, price, create_date, status FROM product WHERE create_date=':createDate'"
                .replace(":createDate", createDate);

        return new JdbcCursorItemReaderBuilder<Product>() // (2)
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Product.class))
                .sql(sql)
                .name(JOB_NAME +"_reader")
                .build();
    }

    private ItemProcessor<Product, ProductBackup> processor() {
        return item -> {
            log.info("Processing Start Item id={}", item.getId());
            Thread.sleep(1000); // (3)
            log.info("Processing End Item id={}", item.getId());
            return new ProductBackup(item);
        };
    }

    @Bean(name = JOB_NAME +"_writer")
    @StepScope
    public JpaItemWriter<ProductBackup> writer() {
        return new JpaItemWriterBuilder<ProductBackup>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }
}
(One) .listener(new CursorItemReaderListener())

JdbcCursorItemReader does not leave a separate log when reading data separately.
Add a listener to make it easier to see that you are reading data in multithreading.
(2) JdbcCursorItemReaderBuilder

Use only JpaPagingItemReader code and just Reader area.
(3) Thread.sleep(1000);

In order to clearly distinguish whether multi-thread is in progress, sleep occurs for 1 second at the processor stage of each thread.
If it is processed at too high speed, the difference between multi-thread and single-thread is hard to distinguish, which is intentional delay.
Now, let's verify the above code with the test code.

@ExtendWith(SpringExtension.class)
@SpringBatchTest
@SpringBootTest(classes={MultiThreadCursorConfiguration.class, TestBatchConfig.class})
@TestPropertySource(properties = {"chunkSize=1", "poolSize=5"})
public class MultiThreadCursorConfigurationTest {

    @Autowired
    private ProductRepository productRepository;

    @Autowired
    private ProductBackupRepository productBackupRepository;

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @AfterEach
    void after() {
        productRepository.deleteAll();
        productBackupRepository.deleteAll();
    }

    @Test
    public void Cursor_분산처리_된다() throws Exception {
        //given
        LocalDate createDate = LocalDate.of(2020,4,13);
        ProductStatus status = ProductStatus.APPROVE;
        long price = 1000L;
        for (int i = 0; i < 10; i++) {
            productRepository.save(Product.builder()
                    .price(i * price)
                    .createDate(createDate)
                    .status(status)
                    .build());
        }

        JobParameters jobParameters = new JobParametersBuilder()
                .addString("createDate", createDate.toString())
                .toJobParameters();
        //when
        JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);

        //then
        assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        List<ProductBackup> backups = productBackupRepository.findAll();
        backups.sort(Comparator.comparingLong(ProductBackup::getPrice));

        assertThat(backups).hasSize(10);
        assertThat(backups.get(0).getPrice()).isEqualTo(0L);
        assertThat(backups.get(9).getPrice()).isEqualTo(9000L);
    }

}
Verify that 10 data were successfully transferred to the backup table.
If it is not thread safe here, there will be a number other than 10?

If you run the above test once!

cursor-test-1

After all, there are a number of cases other than ten.

If you check the 12 saved data, you can see that there are multiple identical data .

cursor-test-2

You can also check that all the threads (5 for the specified pool size) have started reading data with the same ID through the registered listener.

cursor-test-3

Now that we have confirmed that there is a problem with the current code, we will fix it immediately.

3-3. Thread Safety code
The Thread Safety code simply needs SynchronizedItemStreamReaderto wrap the Reader area with.

SynchronizedItemStreamReader2

@Bean(name = JOB_NAME +"_reader")
@StepScope
public SynchronizedItemStreamReader<Product> reader(@Value("#{jobParameters[createDate]}") String createDate) {
String sql = "SELECT id, name, price, create_date, status FROM product WHERE create_date=':createDate'"
        .replace(":createDate", createDate);

JdbcCursorItemReader<Product> itemReader = new JdbcCursorItemReaderBuilder<Product>()
        .fetchSize(chunkSize)
        .dataSource(dataSource)
        .rowMapper(new BeanPropertyRowMapper<>(Product.class))
        .sql(sql)
        .name(JOB_NAME + "_reader")
        .build();

return new SynchronizedItemStreamReaderBuilder<Product>()
        .delegate(itemReader) // (1)
        .build();
}
(One) .delegate(itemReader)

delegate Register the ItemReader object you want to wrap in.
The wrapped object is wrapped in a synchronizedmethod and called as shown in the photo below to enable synchronized reading.
SynchronizedItemStreamReader

SynchronizedItemStreamReader is supported from Spring Batch 4.0 .
If you are using a lower version, you can copy the SynchronizedItemStreamReader class code and add it to the project.

How do I run the test again after changing to SynchronizedItemStreamReader?
You can see that the test passes successfully.

cursor-test-4

In fact, even in the execution log, you can see that it worked well in a multithreaded environment.

cursor-test-5

Normally, the cursor-based multi-thread step was confirmed.

Wrap-up
Now, is it possible to solve slow batch jobs with multi-thread!?
It is not.
If the server resource such as network/DISK IO/CPU/Memory has already reached the limit of resource usage even in a single thread, you cannot expect to improve performance by proceeding with multi-threading.

Multithreading has many considerations.
So, when applying it to the actual operating environment, it is recommended to familiarize yourself with the official Spring documentation , test it thoroughly, and run it.

Comments

Popular posts from this blog

Today Walkin 14th-Sept

Spring Elasticsearch Operations

Hibernate Search - Elasticsearch with JSON manipulation