1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.step.builder;
17
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.LinkedHashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26
27 import org.springframework.batch.core.ChunkListener;
28 import org.springframework.batch.core.JobInterruptedException;
29 import org.springframework.batch.core.SkipListener;
30 import org.springframework.batch.core.StepListener;
31 import org.springframework.batch.core.listener.StepListenerFactoryBean;
32 import org.springframework.batch.core.scope.context.ChunkContext;
33 import org.springframework.batch.core.step.FatalStepExecutionException;
34 import org.springframework.batch.core.step.item.BatchRetryTemplate;
35 import org.springframework.batch.core.step.item.ChunkMonitor;
36 import org.springframework.batch.core.step.item.ChunkOrientedTasklet;
37 import org.springframework.batch.core.step.item.FaultTolerantChunkProcessor;
38 import org.springframework.batch.core.step.item.FaultTolerantChunkProvider;
39 import org.springframework.batch.core.step.item.ForceRollbackForWriteSkipException;
40 import org.springframework.batch.core.step.item.KeyGenerator;
41 import org.springframework.batch.core.step.item.SimpleRetryExceptionHandler;
42 import org.springframework.batch.core.step.skip.CompositeSkipPolicy;
43 import org.springframework.batch.core.step.skip.ExceptionClassifierSkipPolicy;
44 import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
45 import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
46 import org.springframework.batch.core.step.skip.NonSkippableReadException;
47 import org.springframework.batch.core.step.skip.SkipLimitExceededException;
48 import org.springframework.batch.core.step.skip.SkipListenerFailedException;
49 import org.springframework.batch.core.step.skip.SkipPolicy;
50 import org.springframework.batch.core.step.skip.SkipPolicyFailedException;
51 import org.springframework.batch.core.step.tasklet.Tasklet;
52 import org.springframework.batch.item.ItemReader;
53 import org.springframework.batch.item.ItemStream;
54 import org.springframework.batch.repeat.RepeatOperations;
55 import org.springframework.batch.repeat.support.RepeatTemplate;
56 import org.springframework.classify.BinaryExceptionClassifier;
57 import org.springframework.classify.Classifier;
58 import org.springframework.classify.SubclassClassifier;
59 import org.springframework.retry.ExhaustedRetryException;
60 import org.springframework.retry.RetryException;
61 import org.springframework.retry.RetryListener;
62 import org.springframework.retry.RetryPolicy;
63 import org.springframework.retry.backoff.BackOffPolicy;
64 import org.springframework.retry.policy.CompositeRetryPolicy;
65 import org.springframework.retry.policy.ExceptionClassifierRetryPolicy;
66 import org.springframework.retry.policy.NeverRetryPolicy;
67 import org.springframework.retry.policy.RetryContextCache;
68 import org.springframework.retry.policy.SimpleRetryPolicy;
69 import org.springframework.transaction.TransactionException;
70 import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
71 import org.springframework.transaction.interceptor.TransactionAttribute;
72 import org.springframework.util.Assert;
73
74
75
76
77
78
79
80
81
82 public class FaultTolerantStepBuilder<I, O> extends SimpleStepBuilder<I, O> {
83
84 private ChunkMonitor chunkMonitor = new ChunkMonitor();
85
86 private boolean streamIsReader;
87
88 private int retryLimit = 0;
89
90 private BackOffPolicy backOffPolicy;
91
92 private Set<RetryListener> retryListeners = new LinkedHashSet<RetryListener>();
93
94 private RetryPolicy retryPolicy;
95
96 private RetryContextCache retryContextCache;
97
98 private KeyGenerator keyGenerator;
99
100 private Collection<Class<? extends Throwable>> noRollbackExceptionClasses = new LinkedHashSet<Class<? extends Throwable>>();
101
102 private Map<Class<? extends Throwable>, Boolean> skippableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>();
103
104 private Collection<Class<? extends Throwable>> nonSkippableExceptionClasses = new HashSet<Class<? extends Throwable>>();
105
106 private Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>();
107
108 private Collection<Class<? extends Throwable>> nonRetryableExceptionClasses = new HashSet<Class<? extends Throwable>>();
109
110 private Set<SkipListener<? super I, ? super O>> skipListeners = new LinkedHashSet<SkipListener<? super I, ? super O>>();
111
112 private int skipLimit = 0;
113
114 private SkipPolicy skipPolicy;
115
116 private boolean processorTransactional = true;
117
118
119
120
121
122
123 public FaultTolerantStepBuilder(StepBuilderHelper<?> parent) {
124 super(parent);
125 }
126
127
128
129
130
131
132 protected FaultTolerantStepBuilder(SimpleStepBuilder<I, O> parent) {
133 super(parent);
134 }
135
136
137
138
139
140
141 @Override
142 protected Tasklet createTasklet() {
143 Assert.state(getReader() != null, "ItemReader must be provided");
144 Assert.state(getProcessor() != null || getWriter() != null, "ItemWriter or ItemProcessor must be provided");
145 addSpecialExceptions();
146 registerSkipListeners();
147 FaultTolerantChunkProvider<I> chunkProvider = createChunkProvider();
148 FaultTolerantChunkProcessor<I, O> chunkProcessor = createChunkProcessor();
149 ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<I>(chunkProvider, chunkProcessor);
150 tasklet.setBuffering(!isReaderTransactionalQueue());
151 return tasklet;
152 }
153
154
155
156
157
158
159
160 public FaultTolerantStepBuilder<I, O> listener(SkipListener<? super I, ? super O> listener) {
161 skipListeners.add(listener);
162 return this;
163 }
164
165 @Override
166 public FaultTolerantStepBuilder<I, O> listener(ChunkListener listener) {
167 super.listener(new TerminateOnExceptionChunkListenerDelegate(listener));
168 return this;
169 }
170
171 @Override
172 public AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> transactionAttribute(
173 TransactionAttribute transactionAttribute) {
174 return super.transactionAttribute(getTransactionAttribute(transactionAttribute));
175 }
176
177
178
179
180
181
182
183 public FaultTolerantStepBuilder<I, O> listener(RetryListener listener) {
184 retryListeners.add(listener);
185 return this;
186 }
187
188
189
190
191
192
193
194
195
196
197
198 public FaultTolerantStepBuilder<I, O> keyGenerator(KeyGenerator keyGenerator) {
199 this.keyGenerator = keyGenerator;
200 return this;
201 }
202
203
204
205
206
207
208
209
210 public FaultTolerantStepBuilder<I, O> retryLimit(int retryLimit) {
211 this.retryLimit = retryLimit;
212 return this;
213 }
214
215
216
217
218
219
220
221
222 public FaultTolerantStepBuilder<I, O> retryPolicy(RetryPolicy retryPolicy) {
223 this.retryPolicy = retryPolicy;
224 return this;
225 }
226
227
228
229
230
231
232
233
234
235 public FaultTolerantStepBuilder<I, O> backOffPolicy(BackOffPolicy backOffPolicy) {
236 this.backOffPolicy = backOffPolicy;
237 return this;
238 }
239
240
241
242
243
244
245
246
247
248 public FaultTolerantStepBuilder<I, O> retryContextCache(RetryContextCache retryContextCache) {
249 this.retryContextCache = retryContextCache;
250 return this;
251 }
252
253
254
255
256
257
258
259
260 public FaultTolerantStepBuilder<I, O> skipLimit(int skipLimit) {
261 this.skipLimit = skipLimit;
262 return this;
263 }
264
265
266
267
268
269
270
271 public FaultTolerantStepBuilder<I, O> noSkip(Class<? extends Throwable> type) {
272 skippableExceptionClasses.put(type, false);
273 return this;
274 }
275
276
277
278
279
280
281
282 public FaultTolerantStepBuilder<I, O> skip(Class<? extends Throwable> type) {
283 skippableExceptionClasses.put(type, true);
284 return this;
285 }
286
287
288
289
290
291
292
293
294 public FaultTolerantStepBuilder<I, O> skipPolicy(SkipPolicy skipPolicy) {
295 this.skipPolicy = skipPolicy;
296 return this;
297 }
298
299
300
301
302
303
304
305
306
307 public FaultTolerantStepBuilder<I, O> noRollback(Class<? extends Throwable> type) {
308 noRollbackExceptionClasses.add(type);
309 return this;
310 }
311
312
313
314
315
316
317
318 public FaultTolerantStepBuilder<I, O> noRetry(Class<? extends Throwable> type) {
319 retryableExceptionClasses.put(type, false);
320 return this;
321 }
322
323
324
325
326
327
328
329 public FaultTolerantStepBuilder<I, O> retry(Class<? extends Throwable> type) {
330 retryableExceptionClasses.put(type, true);
331 return this;
332 }
333
334
335
336
337
338
339
340
341 public FaultTolerantStepBuilder<I, O> processorNonTransactional() {
342 this.processorTransactional = false;
343 return this;
344 }
345
346 @Override
347 public AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> stream(ItemStream stream) {
348 if (stream instanceof ItemReader<?>) {
349 if (!streamIsReader) {
350 streamIsReader = true;
351 super.stream(chunkMonitor);
352 }
353
354
355 chunkMonitor.registerItemStream(stream);
356 }
357 else {
358 super.stream(stream);
359 }
360 return this;
361 }
362
363 private FaultTolerantChunkProvider<I> createChunkProvider() {
364
365 SkipPolicy readSkipPolicy = createSkipPolicy();
366 readSkipPolicy = getFatalExceptionAwareProxy(readSkipPolicy);
367 FaultTolerantChunkProvider<I> chunkProvider = new FaultTolerantChunkProvider<I>(getReader(),
368 createChunkOperations());
369 chunkProvider.setMaxSkipsOnRead(Math.max(getChunkSize(), FaultTolerantChunkProvider.DEFAULT_MAX_SKIPS_ON_READ));
370 chunkProvider.setSkipPolicy(readSkipPolicy);
371 chunkProvider.setRollbackClassifier(getRollbackClassifier());
372 ArrayList<StepListener> listeners = new ArrayList<StepListener>(getItemListeners());
373 listeners.addAll(skipListeners);
374 chunkProvider.setListeners(listeners);
375
376 return chunkProvider;
377
378 }
379
380 private FaultTolerantChunkProcessor<I, O> createChunkProcessor() {
381
382 BatchRetryTemplate batchRetryTemplate = createRetryOperations();
383
384 FaultTolerantChunkProcessor<I, O> chunkProcessor = new FaultTolerantChunkProcessor<I, O>(getProcessor(),
385 getWriter(), batchRetryTemplate);
386 chunkProcessor.setBuffering(!isReaderTransactionalQueue());
387 chunkProcessor.setProcessorTransactional(processorTransactional);
388
389 SkipPolicy writeSkipPolicy = createSkipPolicy();
390 writeSkipPolicy = getFatalExceptionAwareProxy(writeSkipPolicy);
391 chunkProcessor.setWriteSkipPolicy(writeSkipPolicy);
392 chunkProcessor.setProcessSkipPolicy(writeSkipPolicy);
393 chunkProcessor.setRollbackClassifier(getRollbackClassifier());
394 chunkProcessor.setKeyGenerator(keyGenerator);
395 detectStreamInReader();
396
397 ArrayList<StepListener> listeners = new ArrayList<StepListener>(getItemListeners());
398 listeners.addAll(skipListeners);
399 chunkProcessor.setListeners(listeners);
400 chunkProcessor.setChunkMonitor(chunkMonitor);
401
402 return chunkProcessor;
403
404 }
405
406 @SuppressWarnings("unchecked")
407 private void addSpecialExceptions() {
408 addNonSkippableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class,
409 SkipListenerFailedException.class, SkipPolicyFailedException.class, RetryException.class,
410 JobInterruptedException.class, Error.class);
411 addNonRetryableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class,
412 TransactionException.class, FatalStepExecutionException.class, SkipListenerFailedException.class,
413 SkipPolicyFailedException.class, RetryException.class, JobInterruptedException.class, Error.class);
414 }
415
416 private void detectStreamInReader() {
417 if (streamIsReader) {
418 if (!concurrent()) {
419 chunkMonitor.setItemReader(getReader());
420 }
421 else {
422 logger.warn("Asynchronous TaskExecutor detected with ItemStream reader. This is probably an error, "
423 + "and may lead to incorrect restart data being stored.");
424 }
425 }
426 }
427
428
429
430
431 private void registerSkipListeners() {
432
433
434 for (Object itemHandler : new Object[] { getReader(), getWriter(), getProcessor() }) {
435
436 if (StepListenerFactoryBean.isListener(itemHandler)) {
437 StepListener listener = StepListenerFactoryBean.getListener(itemHandler);
438 if (listener instanceof SkipListener<?, ?>) {
439 @SuppressWarnings("unchecked")
440 SkipListener<? super I, ? super O> skipListener = (SkipListener<? super I, ? super O>) listener;
441 skipListeners.add(skipListener);
442 }
443 }
444
445 }
446 }
447
448
449
450
451
452
453 private Classifier<Throwable, Boolean> getRollbackClassifier() {
454
455 Classifier<Throwable, Boolean> classifier = new BinaryExceptionClassifier(noRollbackExceptionClasses, false);
456
457
458
459 if (!classifier.classify(new ForceRollbackForWriteSkipException("test", new RuntimeException()))
460 || !classifier.classify(new ExhaustedRetryException("test"))) {
461
462 final Classifier<Throwable, Boolean> binary = classifier;
463
464 Collection<Class<? extends Throwable>> types = new HashSet<Class<? extends Throwable>>();
465 types.add(ForceRollbackForWriteSkipException.class);
466 types.add(ExhaustedRetryException.class);
467 final Classifier<Throwable, Boolean> panic = new BinaryExceptionClassifier(types, true);
468
469 classifier = new Classifier<Throwable, Boolean>() {
470 @Override
471 public Boolean classify(Throwable classifiable) {
472
473 return panic.classify(classifiable) || binary.classify(classifiable);
474 }
475 };
476
477 }
478
479 return classifier;
480
481 }
482
483 @SuppressWarnings("serial")
484 private TransactionAttribute getTransactionAttribute(TransactionAttribute attribute) {
485
486 final Classifier<Throwable, Boolean> classifier = getRollbackClassifier();
487 return new DefaultTransactionAttribute(attribute) {
488 @Override
489 public boolean rollbackOn(Throwable ex) {
490 return classifier.classify(ex);
491 }
492
493 };
494
495 }
496
497 protected SkipPolicy createSkipPolicy() {
498 SkipPolicy skipPolicy = this.skipPolicy;
499 Map<Class<? extends Throwable>, Boolean> map = new HashMap<Class<? extends Throwable>, Boolean>(
500 skippableExceptionClasses);
501 map.put(ForceRollbackForWriteSkipException.class, true);
502 LimitCheckingItemSkipPolicy limitCheckingItemSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, map);
503 if (skipPolicy == null) {
504 Assert.state(!(skippableExceptionClasses.isEmpty() && skipLimit > 0),
505 "If a skip limit is provided then skippable exceptions must also be specified");
506 skipPolicy = limitCheckingItemSkipPolicy;
507 }
508 else if (limitCheckingItemSkipPolicy != null) {
509 skipPolicy = new CompositeSkipPolicy(new SkipPolicy[] { skipPolicy, limitCheckingItemSkipPolicy });
510 }
511 return skipPolicy;
512 }
513
514
515
516
517 private BatchRetryTemplate createRetryOperations() {
518
519 RetryPolicy retryPolicy = this.retryPolicy;
520 SimpleRetryPolicy simpleRetryPolicy = null;
521
522 Map<Class<? extends Throwable>, Boolean> map = new HashMap<Class<? extends Throwable>, Boolean>(
523 retryableExceptionClasses);
524 map.put(ForceRollbackForWriteSkipException.class, true);
525 simpleRetryPolicy = new SimpleRetryPolicy(retryLimit, map);
526
527 if (retryPolicy == null) {
528 Assert.state(!(retryableExceptionClasses.isEmpty() && retryLimit > 0),
529 "If a retry limit is provided then retryable exceptions must also be specified");
530 retryPolicy = simpleRetryPolicy;
531 }
532 else if ((!retryableExceptionClasses.isEmpty() && retryLimit > 0)) {
533 CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
534 compositeRetryPolicy.setPolicies(new RetryPolicy[] { retryPolicy, simpleRetryPolicy });
535 retryPolicy = compositeRetryPolicy;
536 }
537
538 RetryPolicy retryPolicyWrapper = getFatalExceptionAwareProxy(retryPolicy);
539
540 BatchRetryTemplate batchRetryTemplate = new BatchRetryTemplate();
541 if (backOffPolicy != null) {
542 batchRetryTemplate.setBackOffPolicy(backOffPolicy);
543 }
544 batchRetryTemplate.setRetryPolicy(retryPolicyWrapper);
545
546
547 RepeatOperations stepOperations = getStepOperations();
548 if (stepOperations instanceof RepeatTemplate) {
549 SimpleRetryExceptionHandler exceptionHandler = new SimpleRetryExceptionHandler(retryPolicyWrapper,
550 getExceptionHandler(), nonRetryableExceptionClasses);
551 ((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler);
552 }
553
554 if (retryContextCache != null) {
555 batchRetryTemplate.setRetryContextCache(retryContextCache);
556 }
557
558 if (retryListeners != null) {
559 batchRetryTemplate.setListeners(retryListeners.toArray(new RetryListener[0]));
560 }
561 return batchRetryTemplate;
562
563 }
564
565
566
567
568
569 private RetryPolicy getFatalExceptionAwareProxy(RetryPolicy retryPolicy) {
570
571 NeverRetryPolicy neverRetryPolicy = new NeverRetryPolicy();
572 Map<Class<? extends Throwable>, RetryPolicy> map = new HashMap<Class<? extends Throwable>, RetryPolicy>();
573 for (Class<? extends Throwable> fatal : nonRetryableExceptionClasses) {
574 map.put(fatal, neverRetryPolicy);
575 }
576
577 SubclassClassifier<Throwable, RetryPolicy> classifier = new SubclassClassifier<Throwable, RetryPolicy>(
578 retryPolicy);
579 classifier.setTypeMap(map);
580
581 ExceptionClassifierRetryPolicy retryPolicyWrapper = new ExceptionClassifierRetryPolicy();
582 retryPolicyWrapper.setExceptionClassifier(classifier);
583 return retryPolicyWrapper;
584
585 }
586
587
588
589
590
591
592
593 private SkipPolicy getFatalExceptionAwareProxy(SkipPolicy skipPolicy) {
594
595 NeverSkipItemSkipPolicy neverSkipPolicy = new NeverSkipItemSkipPolicy();
596 Map<Class<? extends Throwable>, SkipPolicy> map = new HashMap<Class<? extends Throwable>, SkipPolicy>();
597 for (Class<? extends Throwable> fatal : nonSkippableExceptionClasses) {
598 map.put(fatal, neverSkipPolicy);
599 }
600
601 SubclassClassifier<Throwable, SkipPolicy> classifier = new SubclassClassifier<Throwable, SkipPolicy>(skipPolicy);
602 classifier.setTypeMap(map);
603
604 ExceptionClassifierSkipPolicy skipPolicyWrapper = new ExceptionClassifierSkipPolicy();
605 skipPolicyWrapper.setExceptionClassifier(classifier);
606 return skipPolicyWrapper;
607 }
608
609 private void addNonSkippableExceptionIfMissing(Class<? extends Throwable>... cls) {
610 List<Class<? extends Throwable>> exceptions = new ArrayList<Class<? extends Throwable>>();
611 for (Class<? extends Throwable> exceptionClass : nonSkippableExceptionClasses) {
612 exceptions.add(exceptionClass);
613 }
614 for (Class<? extends Throwable> fatal : cls) {
615 if (!exceptions.contains(fatal)) {
616 exceptions.add(fatal);
617 }
618 }
619 nonSkippableExceptionClasses = exceptions;
620 }
621
622 private void addNonRetryableExceptionIfMissing(Class<? extends Throwable>... cls) {
623 List<Class<? extends Throwable>> exceptions = new ArrayList<Class<? extends Throwable>>();
624 for (Class<? extends Throwable> exceptionClass : nonRetryableExceptionClasses) {
625 exceptions.add(exceptionClass);
626 }
627 for (Class<? extends Throwable> fatal : cls) {
628 if (!exceptions.contains(fatal)) {
629 exceptions.add(fatal);
630 }
631 }
632 nonRetryableExceptionClasses = exceptions;
633 }
634
635
636
637
638
639
640
641
642
643 private class TerminateOnExceptionChunkListenerDelegate implements ChunkListener {
644
645 private ChunkListener chunkListener;
646
647 TerminateOnExceptionChunkListenerDelegate(ChunkListener chunkListener) {
648 this.chunkListener = chunkListener;
649 }
650
651 @Override
652 public void beforeChunk(ChunkContext context) {
653 try {
654 chunkListener.beforeChunk(context);
655 }
656 catch (Throwable t) {
657 throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t);
658 }
659 }
660
661 @Override
662 public void afterChunk(ChunkContext context) {
663 try {
664 chunkListener.afterChunk(context);
665 }
666 catch (Throwable t) {
667 throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t);
668 }
669 }
670
671 @Override
672 public void afterChunkError(ChunkContext context) {
673 try {
674 chunkListener.afterChunkError(context);
675 }
676 catch (Throwable t) {
677 throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t);
678 }
679 }
680 }
681 }