1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.step.factory;
17
18 import org.apache.commons.logging.Log;
19 import org.apache.commons.logging.LogFactory;
20 import org.springframework.batch.core.ChunkListener;
21 import org.springframework.batch.core.ItemProcessListener;
22 import org.springframework.batch.core.ItemReadListener;
23 import org.springframework.batch.core.ItemWriteListener;
24 import org.springframework.batch.core.Step;
25 import org.springframework.batch.core.StepExecutionListener;
26 import org.springframework.batch.core.StepListener;
27 import org.springframework.batch.core.repository.JobRepository;
28 import org.springframework.batch.core.step.builder.SimpleStepBuilder;
29 import org.springframework.batch.core.step.builder.StepBuilder;
30 import org.springframework.batch.core.step.tasklet.TaskletStep;
31 import org.springframework.batch.item.ItemProcessor;
32 import org.springframework.batch.item.ItemReader;
33 import org.springframework.batch.item.ItemStream;
34 import org.springframework.batch.item.ItemWriter;
35 import org.springframework.batch.repeat.CompletionPolicy;
36 import org.springframework.batch.repeat.RepeatOperations;
37 import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
38 import org.springframework.batch.repeat.exception.ExceptionHandler;
39 import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
40 import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
41 import org.springframework.beans.factory.BeanNameAware;
42 import org.springframework.beans.factory.FactoryBean;
43 import org.springframework.core.task.TaskExecutor;
44 import org.springframework.transaction.PlatformTransactionManager;
45 import org.springframework.transaction.annotation.Isolation;
46 import org.springframework.transaction.annotation.Propagation;
47 import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
48 import org.springframework.transaction.interceptor.TransactionAttribute;
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 @SuppressWarnings("rawtypes")
64 public class SimpleStepFactoryBean<T, S> implements FactoryBean, BeanNameAware {
65
66 private String name;
67
68 private int startLimit = Integer.MAX_VALUE;
69
70 private boolean allowStartIfComplete;
71
72 private ItemReader<? extends T> itemReader;
73
74 private ItemProcessor<? super T, ? extends S> itemProcessor;
75
76 private ItemWriter<? super S> itemWriter;
77
78 private PlatformTransactionManager transactionManager;
79
80 private Propagation propagation = Propagation.REQUIRED;
81
82 private Isolation isolation = Isolation.DEFAULT;
83
84 private int transactionTimeout = DefaultTransactionAttribute.TIMEOUT_DEFAULT;
85
86 private JobRepository jobRepository;
87
88 private boolean singleton = true;
89
90 private ItemStream[] streams = new ItemStream[0];
91
92 private StepListener[] listeners = new StepListener[0];
93
94 protected final Log logger = LogFactory.getLog(getClass());
95
96 private int commitInterval = 0;
97
98 private TaskExecutor taskExecutor;
99
100 private RepeatOperations stepOperations;
101
102 private RepeatOperations chunkOperations;
103
104 private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
105
106 private CompletionPolicy chunkCompletionPolicy;
107
108 private int throttleLimit = TaskExecutorRepeatTemplate.DEFAULT_THROTTLE_LIMIT;
109
110 private boolean isReaderTransactionalQueue = false;
111
112
113
114
115 public SimpleStepFactoryBean() {
116 super();
117 }
118
119
120
121
122
123
124
125 public void setIsReaderTransactionalQueue(boolean isReaderTransactionalQueue) {
126 this.isReaderTransactionalQueue = isReaderTransactionalQueue;
127 }
128
129
130
131
132
133 protected boolean isReaderTransactionalQueue() {
134 return isReaderTransactionalQueue;
135 }
136
137
138
139
140
141
142 @Override
143 public void setBeanName(String name) {
144 this.name = name;
145 }
146
147
148
149
150
151 public String getName() {
152 return name;
153 }
154
155
156
157
158
159
160 public void setTransactionTimeout(int transactionTimeout) {
161 this.transactionTimeout = transactionTimeout;
162 }
163
164
165
166
167 public void setPropagation(Propagation propagation) {
168 this.propagation = propagation;
169 }
170
171
172
173
174 public void setIsolation(Isolation isolation) {
175 this.isolation = isolation;
176 }
177
178
179
180
181
182
183 public void setStartLimit(int startLimit) {
184 this.startLimit = startLimit;
185 }
186
187
188
189
190
191
192
193 public void setAllowStartIfComplete(boolean allowStartIfComplete) {
194 this.allowStartIfComplete = allowStartIfComplete;
195 }
196
197
198
199
200 public void setItemReader(ItemReader<? extends T> itemReader) {
201 this.itemReader = itemReader;
202 }
203
204
205
206
207 public void setItemWriter(ItemWriter<? super S> itemWriter) {
208 this.itemWriter = itemWriter;
209 }
210
211
212
213
214 public void setItemProcessor(ItemProcessor<? super T, ? extends S> itemProcessor) {
215 this.itemProcessor = itemProcessor;
216 }
217
218
219
220
221
222
223
224 public void setStreams(ItemStream[] streams) {
225 this.streams = streams;
226 }
227
228
229
230
231
232
233
234 public void setListeners(StepListener[] listeners) {
235 this.listeners = listeners;
236 }
237
238
239
240
241
242 protected StepListener[] getListeners() {
243 return listeners;
244 }
245
246
247
248
249
250 protected ItemReader<? extends T> getItemReader() {
251 return itemReader;
252 }
253
254
255
256
257
258 protected ItemWriter<? super S> getItemWriter() {
259 return itemWriter;
260 }
261
262
263
264
265
266 protected ItemProcessor<? super T, ? extends S> getItemProcessor() {
267 return itemProcessor;
268 }
269
270
271
272
273
274
275 public void setJobRepository(JobRepository jobRepository) {
276 this.jobRepository = jobRepository;
277 }
278
279
280
281
282
283
284 public void setTransactionManager(PlatformTransactionManager transactionManager) {
285 this.transactionManager = transactionManager;
286 }
287
288
289
290
291
292 @SuppressWarnings("serial")
293 protected TransactionAttribute getTransactionAttribute() {
294
295 DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
296 attribute.setPropagationBehavior(propagation.value());
297 attribute.setIsolationLevel(isolation.value());
298 attribute.setTimeout(transactionTimeout);
299 return new DefaultTransactionAttribute(attribute) {
300
301
302
303
304
305 @Override
306 public boolean rollbackOn(Throwable ex) {
307 return true;
308 }
309
310 };
311
312 }
313
314
315
316
317
318
319 @Override
320 public final Object getObject() throws Exception {
321 SimpleStepBuilder<T, S> builder = createBuilder(getName());
322 applyConfiguration(builder);
323 TaskletStep step = builder.build();
324 return step;
325 }
326
327 protected SimpleStepBuilder<T, S> createBuilder(String name) {
328 return new SimpleStepBuilder<T, S>(new StepBuilder(name));
329 }
330
331 @Override
332 public Class<TaskletStep> getObjectType() {
333 return TaskletStep.class;
334 }
335
336
337
338
339
340
341
342 @Override
343 public boolean isSingleton() {
344 return this.singleton;
345 }
346
347
348
349
350
351 public void setSingleton(boolean singleton) {
352 this.singleton = singleton;
353 }
354
355
356
357
358
359
360 public void setCommitInterval(int commitInterval) {
361 this.commitInterval = commitInterval;
362 }
363
364
365
366
367
368
369
370
371 public void setChunkCompletionPolicy(CompletionPolicy chunkCompletionPolicy) {
372 this.chunkCompletionPolicy = chunkCompletionPolicy;
373 }
374
375
376
377
378
379 protected RepeatOperations getStepOperations() {
380 return stepOperations;
381 }
382
383
384
385
386
387 public void setStepOperations(RepeatOperations stepOperations) {
388 this.stepOperations = stepOperations;
389 }
390
391
392
393
394
395 public void setChunkOperations(RepeatOperations chunkOperations) {
396 this.chunkOperations = chunkOperations;
397 }
398
399
400
401
402
403 protected RepeatOperations getChunkOperations() {
404 return chunkOperations;
405 }
406
407
408
409
410
411 public void setExceptionHandler(ExceptionHandler exceptionHandler) {
412 this.exceptionHandler = exceptionHandler;
413 }
414
415
416
417
418
419 protected ExceptionHandler getExceptionHandler() {
420 return exceptionHandler;
421 }
422
423
424
425
426
427
428
429 public void setTaskExecutor(TaskExecutor taskExecutor) {
430 this.taskExecutor = taskExecutor;
431 }
432
433
434
435
436
437 protected TaskExecutor getTaskExecutor() {
438 return taskExecutor;
439 }
440
441
442
443
444
445
446 public void setThrottleLimit(int throttleLimit) {
447 this.throttleLimit = throttleLimit;
448 }
449
450 protected void applyConfiguration(SimpleStepBuilder<T, S> builder) {
451
452 builder.reader(itemReader);
453 builder.processor(itemProcessor);
454 builder.writer(itemWriter);
455 for (StepExecutionListener listener : BatchListenerFactoryHelper.<StepExecutionListener> getListeners(
456 listeners, StepExecutionListener.class)) {
457 builder.listener(listener);
458 }
459 for (ChunkListener listener : BatchListenerFactoryHelper.<ChunkListener> getListeners(listeners,
460 ChunkListener.class)) {
461 builder.listener(listener);
462 }
463 for (ItemReadListener<T> listener : BatchListenerFactoryHelper.<ItemReadListener<T>> getListeners(listeners,
464 ItemReadListener.class)) {
465 builder.listener(listener);
466 }
467 for (ItemWriteListener<S> listener : BatchListenerFactoryHelper.<ItemWriteListener<S>> getListeners(listeners,
468 ItemWriteListener.class)) {
469 builder.listener(listener);
470 }
471 for (ItemProcessListener<T, S> listener : BatchListenerFactoryHelper.<ItemProcessListener<T, S>> getListeners(
472 listeners, ItemProcessListener.class)) {
473 builder.listener(listener);
474 }
475 builder.transactionManager(transactionManager);
476 builder.transactionAttribute(getTransactionAttribute());
477 builder.repository(jobRepository);
478 builder.startLimit(startLimit);
479 builder.allowStartIfComplete(allowStartIfComplete);
480 builder.chunk(commitInterval);
481 builder.chunk(chunkCompletionPolicy);
482 builder.chunkOperations(chunkOperations);
483 builder.stepOperations(stepOperations);
484 builder.taskExecutor(taskExecutor);
485 builder.throttleLimit(throttleLimit);
486 builder.exceptionHandler(exceptionHandler);
487 if (isReaderTransactionalQueue) {
488 builder.readerIsTransactionalQueue();
489 }
490 for (ItemStream stream : streams) {
491 builder.stream(stream);
492 }
493
494 }
495 }