1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.core.step.item;
18
19 import org.springframework.classify.BinaryExceptionClassifier;
20 import org.springframework.classify.Classifier;
21 import org.springframework.batch.core.StepContribution;
22 import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
23 import org.springframework.batch.core.step.skip.NonSkippableReadException;
24 import org.springframework.batch.core.step.skip.SkipException;
25 import org.springframework.batch.core.step.skip.SkipListenerFailedException;
26 import org.springframework.batch.core.step.skip.SkipPolicy;
27 import org.springframework.batch.core.step.skip.SkipPolicyFailedException;
28 import org.springframework.batch.item.ItemReader;
29 import org.springframework.batch.repeat.RepeatOperations;
30
31
32
33
34
35
36
37 public class FaultTolerantChunkProvider<I> extends SimpleChunkProvider<I> {
38
39
40
41
42
43
44
45 public static final int DEFAULT_MAX_SKIPS_ON_READ = 100;
46
47 private SkipPolicy skipPolicy = new LimitCheckingItemSkipPolicy();
48
49 private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true);
50
51 private int maxSkipsOnRead = DEFAULT_MAX_SKIPS_ON_READ;
52
53 public FaultTolerantChunkProvider(ItemReader<? extends I> itemReader, RepeatOperations repeatOperations) {
54 super(itemReader, repeatOperations);
55 }
56
57
58
59
60 public void setMaxSkipsOnRead(int maxSkipsOnRead) {
61 this.maxSkipsOnRead = maxSkipsOnRead;
62 }
63
64
65
66
67
68 public void setSkipPolicy(SkipPolicy SkipPolicy) {
69 this.skipPolicy = SkipPolicy;
70 }
71
72
73
74
75
76
77
78
79 public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) {
80 this.rollbackClassifier = rollbackClassifier;
81 }
82
83 @Override
84 protected I read(StepContribution contribution, Chunk<I> chunk) throws Exception {
85 while (true) {
86 try {
87 return doRead();
88 }
89 catch (Exception e) {
90
91 if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) {
92
93 contribution.incrementReadSkipCount();
94 chunk.skip(e);
95
96 if (chunk.getErrors().size() >= maxSkipsOnRead) {
97 throw new SkipOverflowException("Too many skips on read");
98 }
99
100 logger.debug("Skipping failed input", e);
101 }
102 else {
103 if (rollbackClassifier.classify(e)) {
104 throw new NonSkippableReadException("Non-skippable exception during read", e);
105 }
106 logger.debug("No-rollback for non-skippable exception (ignored)", e);
107 }
108
109 }
110 }
111 }
112
113 @Override
114 public void postProcess(StepContribution contribution, Chunk<I> chunk) {
115 for (Exception e : chunk.getErrors()) {
116 try {
117 getListener().onSkipInRead(e);
118 }
119 catch (RuntimeException ex) {
120 throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e);
121 }
122 }
123 }
124
125
126
127
128
129
130
131
132 private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) {
133 try {
134 return policy.shouldSkip(e, skipCount);
135 }
136 catch (SkipException ex) {
137 throw ex;
138 }
139 catch (RuntimeException ex) {
140 throw new SkipPolicyFailedException("Fatal exception in SkipPolicy.", ex, e);
141 }
142 }
143
144 }