1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.core.step.item;
18
19 import org.apache.commons.logging.Log;
20 import org.apache.commons.logging.LogFactory;
21 import org.springframework.batch.core.StepContribution;
22 import org.springframework.batch.core.scope.context.ChunkContext;
23 import org.springframework.batch.core.step.tasklet.Tasklet;
24 import org.springframework.batch.repeat.RepeatStatus;
25
26
27
28
29
30
31
32
33
34 public class ChunkOrientedTasklet<I> implements Tasklet {
35
36 private static final String INPUTS_KEY = "INPUTS";
37
38 private final ChunkProcessor<I> chunkProcessor;
39
40 private final ChunkProvider<I> chunkProvider;
41
42 private boolean buffering = true;
43
44 private static Log logger = LogFactory.getLog(ChunkOrientedTasklet.class);
45
46 public ChunkOrientedTasklet(ChunkProvider<I> chunkProvider, ChunkProcessor<I> chunkProcessor) {
47 this.chunkProvider = chunkProvider;
48 this.chunkProcessor = chunkProcessor;
49 }
50
51
52
53
54
55
56
57
58
59 public void setBuffering(boolean buffering) {
60 this.buffering = buffering;
61 }
62
63 @Override
64 public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
65
66 @SuppressWarnings("unchecked")
67 Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
68 if (inputs == null) {
69 inputs = chunkProvider.provide(contribution);
70 if (buffering) {
71 chunkContext.setAttribute(INPUTS_KEY, inputs);
72 }
73 }
74
75 chunkProcessor.process(contribution, inputs);
76 chunkProvider.postProcess(contribution, inputs);
77
78
79
80 if (inputs.isBusy()) {
81 logger.debug("Inputs still busy");
82 return RepeatStatus.CONTINUABLE;
83 }
84
85 chunkContext.removeAttribute(INPUTS_KEY);
86 chunkContext.setComplete();
87
88 logger.debug("Inputs not busy, ended: " + inputs.isEnd());
89 return RepeatStatus.continueIf(!inputs.isEnd());
90
91 }
92
93 }