1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.core.step.item;
18
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.concurrent.atomic.AtomicReference;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.springframework.batch.core.StepContribution;
29 import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
30 import org.springframework.batch.core.step.skip.NonSkippableProcessException;
31 import org.springframework.batch.core.step.skip.SkipLimitExceededException;
32 import org.springframework.batch.core.step.skip.SkipListenerFailedException;
33 import org.springframework.batch.core.step.skip.SkipPolicy;
34 import org.springframework.batch.item.ItemProcessor;
35 import org.springframework.batch.item.ItemWriter;
36 import org.springframework.classify.BinaryExceptionClassifier;
37 import org.springframework.classify.Classifier;
38 import org.springframework.retry.ExhaustedRetryException;
39 import org.springframework.retry.RecoveryCallback;
40 import org.springframework.retry.RetryCallback;
41 import org.springframework.retry.RetryContext;
42 import org.springframework.retry.RetryException;
43 import org.springframework.retry.support.DefaultRetryState;
44
45
46
47
48
49
50 public class FaultTolerantChunkProcessor<I, O> extends SimpleChunkProcessor<I, O> {
51
52 private SkipPolicy itemProcessSkipPolicy = new LimitCheckingItemSkipPolicy();
53
54 private SkipPolicy itemWriteSkipPolicy = new LimitCheckingItemSkipPolicy();
55
56 private final BatchRetryTemplate batchRetryTemplate;
57
58 private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true);
59
60 private Log logger = LogFactory.getLog(getClass());
61
62 private boolean buffering = true;
63
64 private KeyGenerator keyGenerator;
65
66 private ChunkMonitor chunkMonitor = new ChunkMonitor();
67
68 private boolean processorTransactional = true;
69
70
71
72
73
74
75
76
77 public void setKeyGenerator(KeyGenerator keyGenerator) {
78 this.keyGenerator = keyGenerator;
79 }
80
81
82
83
84 public void setProcessSkipPolicy(SkipPolicy SkipPolicy) {
85 this.itemProcessSkipPolicy = SkipPolicy;
86 }
87
88
89
90
91 public void setWriteSkipPolicy(SkipPolicy SkipPolicy) {
92 this.itemWriteSkipPolicy = SkipPolicy;
93 }
94
95
96
97
98
99
100
101 public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) {
102 this.rollbackClassifier = rollbackClassifier;
103 }
104
105
106
107
108 public void setChunkMonitor(ChunkMonitor chunkMonitor) {
109 this.chunkMonitor = chunkMonitor;
110 }
111
112
113
114
115
116
117
118
119
120 public void setBuffering(boolean buffering) {
121 this.buffering = buffering;
122 }
123
124
125
126
127
128
129
130
131 public void setProcessorTransactional(boolean processorTransactional) {
132 this.processorTransactional = processorTransactional;
133 }
134
135 public FaultTolerantChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor,
136 ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate) {
137 super(itemProcessor, itemWriter);
138 this.batchRetryTemplate = batchRetryTemplate;
139 }
140
141 @Override
142 protected void initializeUserData(Chunk<I> inputs) {
143 @SuppressWarnings("unchecked")
144 UserData<O> data = (UserData<O>) inputs.getUserData();
145 if (data == null) {
146 data = new UserData<O>();
147 inputs.setUserData(data);
148 data.setOutputs(new Chunk<O>());
149 }
150 }
151
152 @Override
153 protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) {
154 @SuppressWarnings("unchecked")
155 UserData<O> data = (UserData<O>) inputs.getUserData();
156 return data.filterCount;
157 }
158
159 @Override
160 protected boolean isComplete(Chunk<I> inputs) {
161
162
163
164
165
166
167
168
169
170
171 @SuppressWarnings("unchecked")
172 UserData<O> data = (UserData<O>) inputs.getUserData();
173 Chunk<O> previous = data.getOutputs();
174
175 return inputs.isEmpty() && previous.getSkips().isEmpty();
176
177 }
178
179 @Override
180 protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) {
181
182 @SuppressWarnings("unchecked")
183 UserData<O> data = (UserData<O>) inputs.getUserData();
184 Chunk<O> previous = data.getOutputs();
185
186 Chunk<O> next = new Chunk<O>(outputs.getItems(), previous.getSkips());
187 next.setBusy(previous.isBusy());
188
189
190 data.setOutputs(next);
191
192 return next;
193
194 }
195
196 @Override
197 protected Chunk<O> transform(final StepContribution contribution, Chunk<I> inputs) throws Exception {
198
199 Chunk<O> outputs = new Chunk<O>();
200 @SuppressWarnings("unchecked")
201 final UserData<O> data = (UserData<O>) inputs.getUserData();
202 final Chunk<O> cache = data.getOutputs();
203 final Iterator<O> cacheIterator = cache.isEmpty() ? null : new ArrayList<O>(cache.getItems()).iterator();
204 final AtomicInteger count = new AtomicInteger(0);
205
206
207
208
209 for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
210
211 final I item = iterator.next();
212
213 RetryCallback<O> retryCallback = new RetryCallback<O>() {
214
215 @Override
216 public O doWithRetry(RetryContext context) throws Exception {
217 O output = null;
218 try {
219 count.incrementAndGet();
220 O cached = (cacheIterator != null && cacheIterator.hasNext()) ? cacheIterator.next() : null;
221 if (cached != null && !processorTransactional) {
222 output = cached;
223 }
224 else {
225 output = doProcess(item);
226 if (!processorTransactional && !data.scanning()) {
227 cache.add(output);
228 }
229 }
230 }
231 catch (Exception e) {
232 if (rollbackClassifier.classify(e)) {
233
234
235 throw e;
236 }
237 else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
238
239
240 contribution.incrementProcessSkipCount();
241 logger.debug("Skipping after failed process with no rollback", e);
242
243
244 callProcessSkipListener(item, e);
245 }
246 else {
247
248
249
250 throw new NonSkippableProcessException(
251 "Non-skippable exception in processor. Make sure any exceptions that do not cause a rollback are skippable.",
252 e);
253 }
254 }
255 if (output == null) {
256
257 iterator.remove();
258 data.incrementFilterCount();
259 }
260 return output;
261 }
262
263 };
264
265 RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() {
266
267 @Override
268 public O recover(RetryContext context) throws Exception {
269 Throwable e = context.getLastThrowable();
270 if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
271 iterator.remove(e);
272 contribution.incrementProcessSkipCount();
273 logger.debug("Skipping after failed process", e);
274 return null;
275 }
276 else {
277 if (rollbackClassifier.classify(e)) {
278
279
280 throw new RetryException("Non-skippable exception in recoverer while processing", e);
281 }
282 iterator.remove(e);
283 return null;
284 }
285 }
286
287 };
288
289 O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(
290 getInputKey(item), rollbackClassifier));
291 if (output != null) {
292 outputs.add(output);
293 }
294
295
296
297
298
299 if (data.scanning()) {
300 while (cacheIterator != null && cacheIterator.hasNext()) {
301 outputs.add(cacheIterator.next());
302 }
303
304 break;
305 }
306 }
307
308 return outputs;
309
310 }
311
312 @Override
313 protected void write(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs)
314 throws Exception {
315
316 @SuppressWarnings("unchecked")
317 final UserData<O> data = (UserData<O>) inputs.getUserData();
318 final AtomicReference<RetryContext> contextHolder = new AtomicReference<RetryContext>();
319
320 RetryCallback<Object> retryCallback = new RetryCallback<Object>() {
321 @Override
322 public Object doWithRetry(RetryContext context) throws Exception {
323
324 contextHolder.set(context);
325
326 if (!data.scanning()) {
327 chunkMonitor.setChunkSize(inputs.size());
328 try {
329 doWrite(outputs.getItems());
330 }
331 catch (Exception e) {
332 if (rollbackClassifier.classify(e)) {
333 throw e;
334 }
335
336
337
338
339
340
341 throw new ForceRollbackForWriteSkipException(
342 "Force rollback on skippable exception so that skipped item can be located.", e);
343 }
344 contribution.incrementWriteCount(outputs.size());
345 }
346 else {
347 scan(contribution, inputs, outputs, chunkMonitor, false);
348 }
349 return null;
350
351 }
352 };
353
354 if (!buffering) {
355
356 RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() {
357
358 @Override
359 public Object recover(RetryContext context) throws Exception {
360
361 Throwable e = context.getLastThrowable();
362 if (outputs.size() > 1 && !rollbackClassifier.classify(e)) {
363 throw new RetryException("Invalid retry state during write caused by "
364 + "exception that does not classify for rollback: ", e);
365 }
366
367 Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
368 for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) {
369
370 inputIterator.next();
371 outputIterator.next();
372
373 checkSkipPolicy(inputIterator, outputIterator, e, contribution, true);
374 if (!rollbackClassifier.classify(e)) {
375 throw new RetryException(
376 "Invalid retry state during recovery caused by exception that does not classify for rollback: ",
377 e);
378 }
379
380 }
381
382 return null;
383
384 }
385
386 };
387
388 batchRetryTemplate.execute(retryCallback, batchRecoveryCallback,
389 BatchRetryTemplate.createState(getInputKeys(inputs), rollbackClassifier));
390
391 }
392 else {
393
394 RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {
395
396 @Override
397 public Object recover(RetryContext context) throws Exception {
398
399
400
401
402
403
404 if (!shouldSkip(itemWriteSkipPolicy, context.getLastThrowable(), -1)) {
405 throw new ExhaustedRetryException(
406 "Retry exhausted after last attempt in recovery path, but exception is not skippable.",
407 context.getLastThrowable());
408 }
409
410 inputs.setBusy(true);
411 data.scanning(true);
412 scan(contribution, inputs, outputs, chunkMonitor, true);
413 return null;
414 }
415
416 };
417
418 if (logger.isDebugEnabled()) {
419 logger.debug("Attempting to write: " + inputs);
420 }
421 try {
422 batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs,
423 rollbackClassifier));
424 }
425 catch (Exception e) {
426 RetryContext context = contextHolder.get();
427 if (!batchRetryTemplate.canRetry(context)) {
428
429
430
431
432
433 data.scanning(true);
434 }
435 throw e;
436 }
437
438 }
439
440 callSkipListeners(inputs, outputs);
441
442 }
443
444 private void callSkipListeners(final Chunk<I> inputs, final Chunk<O> outputs) {
445
446 for (SkipWrapper<I> wrapper : inputs.getSkips()) {
447 I item = wrapper.getItem();
448 if (item == null) {
449 continue;
450 }
451 Throwable e = wrapper.getException();
452 callProcessSkipListener(item, e);
453 }
454
455 for (SkipWrapper<O> wrapper : outputs.getSkips()) {
456 Throwable e = wrapper.getException();
457 try {
458 getListener().onSkipInWrite(wrapper.getItem(), e);
459 }
460 catch (RuntimeException ex) {
461 throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e);
462 }
463 }
464
465
466 outputs.clearSkips();
467 inputs.clearSkips();
468
469 }
470
471
472
473
474
475
476
477
478 private void callProcessSkipListener(I item, Throwable e) {
479 try {
480 getListener().onSkipInProcess(item, e);
481 }
482 catch (RuntimeException ex) {
483 throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e);
484 }
485 }
486
487
488
489
490
491
492
493
494
495 private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) {
496 try {
497 return policy.shouldSkip(e, skipCount);
498 }
499 catch (SkipLimitExceededException ex) {
500 throw ex;
501 }
502 catch (RuntimeException ex) {
503 throw new SkipListenerFailedException("Fatal exception in SkipPolicy.", ex, e);
504 }
505 }
506
507 private Object getInputKey(I item) {
508 if (keyGenerator == null) {
509 return item;
510 }
511 return keyGenerator.getKey(item);
512 }
513
514 private List<?> getInputKeys(final Chunk<I> inputs) {
515 if (keyGenerator == null) {
516 return inputs.getItems();
517 }
518 List<Object> keys = new ArrayList<Object>();
519 for (I item : inputs.getItems()) {
520 keys.add(keyGenerator.getKey(item));
521 }
522 return keys;
523 }
524
525 private void checkSkipPolicy(Chunk<I>.ChunkIterator inputIterator, Chunk<O>.ChunkIterator outputIterator,
526 Throwable e, StepContribution contribution, boolean recovery) throws Exception {
527 logger.debug("Checking skip policy after failed write");
528 if (shouldSkip(itemWriteSkipPolicy, e, contribution.getStepSkipCount())) {
529 contribution.incrementWriteSkipCount();
530 inputIterator.remove();
531 outputIterator.remove(e);
532 logger.debug("Skipping after failed write", e);
533 }
534 else {
535 if (recovery) {
536
537 throw new RetryException("Non-skippable exception in recoverer", e);
538 }
539 else {
540 if (e instanceof Exception) {
541 throw (Exception) e;
542 }
543 else if (e instanceof Error) {
544 throw (Error) e;
545 }
546 else {
547 throw new RetryException("Non-skippable throwable in recoverer", e);
548 }
549 }
550 }
551 }
552
553 private void scan(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs,
554 ChunkMonitor chunkMonitor, boolean recovery) throws Exception {
555
556 @SuppressWarnings("unchecked")
557 final UserData<O> data = (UserData<O>) inputs.getUserData();
558
559 if (logger.isDebugEnabled()) {
560 if (recovery) {
561 logger.debug("Scanning for failed item on recovery from write: " + inputs);
562 }
563 else {
564 logger.debug("Scanning for failed item on write: " + inputs);
565 }
566 }
567 if (outputs.isEmpty()) {
568 data.scanning(false);
569 inputs.setBusy(false);
570 return;
571 }
572
573 Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
574 Chunk<O>.ChunkIterator outputIterator = outputs.iterator();
575
576 List<O> items = Collections.singletonList(outputIterator.next());
577 inputIterator.next();
578 try {
579 writeItems(items);
580
581
582 doAfterWrite(items);
583 contribution.incrementWriteCount(1);
584 inputIterator.remove();
585 outputIterator.remove();
586 }
587 catch (Exception e) {
588 doOnWriteError(e, items);
589 if (!shouldSkip(itemWriteSkipPolicy, e, -1) && !rollbackClassifier.classify(e)) {
590 inputIterator.remove();
591 outputIterator.remove();
592 }
593 else {
594 checkSkipPolicy(inputIterator, outputIterator, e, contribution, recovery);
595 }
596 if (rollbackClassifier.classify(e)) {
597 throw e;
598 }
599 }
600 chunkMonitor.incrementOffset();
601 if (outputs.isEmpty()) {
602 data.scanning(false);
603 inputs.setBusy(false);
604 chunkMonitor.resetOffset();
605 }
606 }
607
608 private static class UserData<O> {
609
610 private Chunk<O> outputs;
611
612 private int filterCount = 0;
613
614 private boolean scanning;
615
616 public boolean scanning() {
617 return scanning;
618 }
619
620 public void scanning(boolean scanning) {
621 this.scanning = scanning;
622 }
623
624 public void incrementFilterCount() {
625 filterCount++;
626 }
627
628 public Chunk<O> getOutputs() {
629 return outputs;
630 }
631
632 public void setOutputs(Chunk<O> outputs) {
633 this.outputs = outputs;
634 }
635
636 }
637
638 }