1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.step.builder;
17
18 import java.util.ArrayList;
19 import java.util.LinkedHashSet;
20 import java.util.Set;
21
22 import org.springframework.batch.core.ChunkListener;
23 import org.springframework.batch.core.ItemProcessListener;
24 import org.springframework.batch.core.ItemReadListener;
25 import org.springframework.batch.core.ItemWriteListener;
26 import org.springframework.batch.core.StepExecutionListener;
27 import org.springframework.batch.core.StepListener;
28 import org.springframework.batch.core.listener.StepListenerFactoryBean;
29 import org.springframework.batch.core.step.item.ChunkOrientedTasklet;
30 import org.springframework.batch.core.step.item.SimpleChunkProcessor;
31 import org.springframework.batch.core.step.item.SimpleChunkProvider;
32 import org.springframework.batch.core.step.tasklet.Tasklet;
33 import org.springframework.batch.core.step.tasklet.TaskletStep;
34 import org.springframework.batch.item.ItemProcessor;
35 import org.springframework.batch.item.ItemReader;
36 import org.springframework.batch.item.ItemStream;
37 import org.springframework.batch.item.ItemWriter;
38 import org.springframework.batch.repeat.CompletionPolicy;
39 import org.springframework.batch.repeat.RepeatOperations;
40 import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
41 import org.springframework.batch.repeat.support.RepeatTemplate;
42 import org.springframework.util.Assert;
43
44
45
46
47
48
49
50
51
52
53
54
55 public class SimpleStepBuilder<I, O> extends AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> {
56
57 private static final int DEFAULT_COMMIT_INTERVAL = 1;
58
59 private ItemReader<? extends I> reader;
60
61 private ItemWriter<? super O> writer;
62
63 private ItemProcessor<? super I, ? extends O> processor;
64
65 private int chunkSize = 0;
66
67 private RepeatOperations chunkOperations;
68
69 private CompletionPolicy completionPolicy;
70
71 private Set<StepListener> itemListeners = new LinkedHashSet<StepListener>();
72
73 private boolean readerTransactionalQueue = false;
74
75
76
77
78
79
80 public SimpleStepBuilder(StepBuilderHelper<?> parent) {
81 super(parent);
82 }
83
84
85
86
87
88
89 protected SimpleStepBuilder(SimpleStepBuilder<I, O> parent) {
90 super(parent);
91 this.chunkSize = parent.chunkSize;
92 this.completionPolicy = parent.completionPolicy;
93 this.chunkOperations = parent.chunkOperations;
94 this.reader = parent.reader;
95 this.writer = parent.writer;
96 this.processor = parent.processor;
97 this.itemListeners = parent.itemListeners;
98 this.readerTransactionalQueue = parent.readerTransactionalQueue;
99 }
100
101 public FaultTolerantStepBuilder<I, O> faultTolerant() {
102 FaultTolerantStepBuilder<I, O> builder = new FaultTolerantStepBuilder<I, O>(this);
103 return builder;
104 }
105
106
107
108
109
110
111 @Override
112 public TaskletStep build() {
113 registerAsStreamsAndListeners(reader, processor, writer);
114 return super.build();
115 }
116
117 @Override
118 protected Tasklet createTasklet() {
119 Assert.state(reader != null, "ItemReader must be provided");
120 Assert.state(processor != null || writer != null, "ItemWriter or ItemProcessor must be provided");
121 RepeatOperations repeatOperations = createChunkOperations();
122 SimpleChunkProvider<I> chunkProvider = new SimpleChunkProvider<I>(reader, repeatOperations);
123 SimpleChunkProcessor<I, O> chunkProcessor = new SimpleChunkProcessor<I, O>(processor, writer);
124 chunkProvider.setListeners(new ArrayList<StepListener>(itemListeners));
125 chunkProcessor.setListeners(new ArrayList<StepListener>(itemListeners));
126 ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<I>(chunkProvider, chunkProcessor);
127 tasklet.setBuffering(!readerTransactionalQueue);
128 return tasklet;
129 }
130
131
132
133
134
135
136
137
138
139 public SimpleStepBuilder<I, O> chunk(int chunkSize) {
140 Assert.state(completionPolicy == null || chunkSize == 0,
141 "You must specify either a chunkCompletionPolicy or a commitInterval but not both.");
142 this.chunkSize = chunkSize;
143 return this;
144 }
145
146
147
148
149
150
151
152
153 public SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
154 Assert.state(chunkSize == 0 || completionPolicy == null,
155 "You must specify either a chunkCompletionPolicy or a commitInterval but not both.");
156 this.completionPolicy = completionPolicy;
157 return this;
158 }
159
160
161
162
163
164
165
166
167
168 public SimpleStepBuilder<I, O> reader(ItemReader<? extends I> reader) {
169 this.reader = reader;
170 return this;
171 }
172
173
174
175
176
177
178
179
180 public SimpleStepBuilder<I, O> writer(ItemWriter<? super O> writer) {
181 this.writer = writer;
182 return this;
183 }
184
185
186
187
188
189
190
191
192 public SimpleStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> processor) {
193 this.processor = processor;
194 return this;
195 }
196
197
198
199
200
201
202
203
204 public SimpleStepBuilder<I, O> readerIsTransactionalQueue() {
205 this.readerTransactionalQueue = true;
206 return this;
207 }
208
209
210
211
212
213
214
215 public SimpleStepBuilder<I, O> listener(ItemReadListener<? super I> listener) {
216 itemListeners.add(listener);
217 return this;
218 }
219
220
221
222
223
224
225
226 public SimpleStepBuilder<I, O> listener(ItemWriteListener<? super O> listener) {
227 itemListeners.add(listener);
228 return this;
229 }
230
231
232
233
234
235
236
237 public SimpleStepBuilder<I, O> listener(ItemProcessListener<? super I, ? super O> listener) {
238 itemListeners.add(listener);
239 return this;
240 }
241
242
243
244
245
246
247
248
249 public SimpleStepBuilder<I, O> chunkOperations(RepeatOperations repeatTemplate) {
250 this.chunkOperations = repeatTemplate;
251 return this;
252 }
253
254 protected RepeatOperations createChunkOperations() {
255 RepeatOperations repeatOperations = chunkOperations;
256 if (repeatOperations == null) {
257 RepeatTemplate repeatTemplate = new RepeatTemplate();
258 repeatTemplate.setCompletionPolicy(getChunkCompletionPolicy());
259 repeatOperations = repeatTemplate;
260 }
261 return repeatOperations;
262 }
263
264 protected ItemReader<? extends I> getReader() {
265 return reader;
266 }
267
268 protected ItemWriter<? super O> getWriter() {
269 return writer;
270 }
271
272 protected ItemProcessor<? super I, ? extends O> getProcessor() {
273 return processor;
274 }
275
276 protected int getChunkSize() {
277 return chunkSize;
278 }
279
280 protected boolean isReaderTransactionalQueue() {
281 return readerTransactionalQueue;
282 }
283
284 protected Set<StepListener> getItemListeners() {
285 return itemListeners;
286 }
287
288
289
290
291 private CompletionPolicy getChunkCompletionPolicy() {
292 Assert.state(!(completionPolicy != null && chunkSize > 0),
293 "You must specify either a chunkCompletionPolicy or a commitInterval but not both.");
294 Assert.state(chunkSize >= 0, "The commitInterval must be positive or zero (for default value).");
295
296 if (completionPolicy != null) {
297 return completionPolicy;
298 }
299 if (chunkSize == 0) {
300 logger.info("Setting commit interval to default value (" + DEFAULT_COMMIT_INTERVAL + ")");
301 chunkSize = DEFAULT_COMMIT_INTERVAL;
302 }
303 return new SimpleCompletionPolicy(chunkSize);
304 }
305
306 private void registerAsStreamsAndListeners(ItemReader<? extends I> itemReader,
307 ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) {
308 for (Object itemHandler : new Object[] { itemReader, itemWriter, itemProcessor }) {
309 if (itemHandler instanceof ItemStream) {
310 stream((ItemStream) itemHandler);
311 }
312 if (StepListenerFactoryBean.isListener(itemHandler)) {
313 StepListener listener = StepListenerFactoryBean.getListener(itemHandler);
314 if (listener instanceof StepExecutionListener) {
315 listener((StepExecutionListener) listener);
316 }
317 if (listener instanceof ChunkListener) {
318 listener((ChunkListener) listener);
319 }
320 if (listener instanceof ItemReadListener<?> || listener instanceof ItemProcessListener<?, ?>
321 || listener instanceof ItemWriteListener<?>) {
322 itemListeners.add(listener);
323 }
324 }
325 }
326 }
327
328 }