powered by simpleCommunicator - 2.0.49     © 2025 Programmizd 02
Форумы / Java [игнор отключен] [закрыт для гостей] / Как применить партишининг к спринг батч джобе ?
1 сообщений из 1, страница 1 из 1
Как применить партишининг к спринг батч джобе ?
    #39845584
questioner
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Написал простейший код одношаговой джобы: https://github.com/gredwhite/spring-batch-hello-world/tree/master


Теперь хочу применить партишиниг:

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
package spring.boot.hello.world;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import spring.boot.hello.world.batch.DbPersonWriter;
import spring.boot.hello.world.batch.MyPartitioner;
import spring.boot.hello.world.batch.ToLowerCasePersonProcessor;
import spring.boot.hello.world.model.Person;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DbPersonWriter dbPersonWriter;

    @Autowired
    private ToLowerCasePersonProcessor toLowerCasePersonProcessor;

    @Autowired
    private MyPartitioner myPartitioner;

    @Value("${app.users-location}")
    Resource csvResource;

    @Bean
    public Job job() {
        return jobBuilderFactory.get("myJob")
                .incrementer(new RunIdIncrementer())
                .flow(demoPartitionStep())
                .end()
                .build();
    }

    private Step demoPartitionStep() {
        return stepBuilderFactory.get("demoPartitionStep")
                .partitioner("demoPartitionStep", myPartitioner)
                .gridSize(5)
                .step(csvToDataBaseStep())
                .taskExecutor(jobTaskExecutor())
                .build();
    }

    private Step csvToDataBaseStep() {
        return stepBuilderFactory.get("csvToDatabaseStep")
                .<Person, Person>chunk(10)
                .reader(csvPersonReader)
                .processor(toLowerCasePersonProcessor)
                .writer(dbPersonWriter)
                .build();

    }

    @Autowired
    private FlatFileItemReader csvPersonReader;

    @Bean
    @StepScope
    public FlatFileItemReader csvPersonReader() {
        return new FlatFileItemReaderBuilder()
                .name("csvPersonReader")
                .resource(csvResource)
                .delimited()
                .names(new String[]{"firstName", "lastName"})
                .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                    setTargetType(Person.class);
                }})
                .build();

    }

    @Bean
    public TaskExecutor jobTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // there are 21 sites currently hence we have 21 threads
        taskExecutor.setMaxPoolSize(30);
        taskExecutor.setCorePoolSize(25);
        taskExecutor.setThreadGroupName("custom-executor");
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }
}



Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
@Component
public class MyPartitioner implements Partitioner {

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> map = new HashMap<>(gridSize);
        for (int k = 0; k < gridSize; k++) {
            ExecutionContext context = new ExecutionContext();
            context.putString("keyName", "key_" + k); //Depends on what logic you want to use to split
            map.put("PARTITION_KEY" + k, context);
        }
        return map;
    }
}



При таком коде в базе записи дублируются. Причем количество дубликатов равно gridSize


Если убрать @StepScope c csvReader, то всё начинает валиться из-за того, что много потоков работает с одним и тем же файлом:


Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
2019-08-05 19:25:22.303 ERROR 24100 --- [bTaskExecutor-2] o.s.batch.core.step.AbstractStep         : Encountered an error executing step csvToDatabaseStep in job myJob

org.springframework.batch.item.file.NonTransientFlatFileException: Unable to read from resource: [class path resource [users.csv]]
    at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:220) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:92) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:161) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:113) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:203) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:139) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:136) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
Caused by: java.io.IOException: Stream closed
    at java.io.BufferedReader.ensureOpen(BufferedReader.java:122) ~[na:1.8.0_111]
    at java.io.BufferedReader.readLine(BufferedReader.java:317) ~[na:1.8.0_111]
    at java.io.BufferedReader.readLine(BufferedReader.java:389) ~[na:1.8.0_111]
    at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:201) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    ... 26 common frames omitted




и

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
2019-08-05 19:25:22.319 ERROR 24100 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step demoPartitionStep in job myJob

org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
    at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:203) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:136) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:313) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:144) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_111]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_111]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_111]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_111]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) [spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at com.sun.proxy.$Proxy77.run(Unknown Source) [na:na]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:206) [spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:180) [spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:167) [spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:162) [spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:779) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:763) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:318) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at spring.boot.hello.world.MyApplication.main(MyApplication.java:9) [main/:na]



Как это починить?
...
Рейтинг: 0 / 0
1 сообщений из 1, страница 1 из 1
Форумы / Java [игнор отключен] [закрыт для гостей] / Как применить партишининг к спринг батч джобе ?
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]