1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.core.step.item;
18
19 import java.util.List;
20
21 import org.springframework.batch.core.StepContribution;
22 import org.springframework.batch.core.StepListener;
23 import org.springframework.batch.core.listener.MulticasterBatchListener;
24 import org.springframework.batch.item.ItemProcessor;
25 import org.springframework.batch.item.ItemWriter;
26 import org.springframework.beans.factory.InitializingBean;
27 import org.springframework.util.Assert;
28
29
30
31
32
33
34
35
36 public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean {
37
38 private ItemProcessor<? super I, ? extends O> itemProcessor;
39
40 private ItemWriter<? super O> itemWriter;
41
42 private final MulticasterBatchListener<I, O> listener = new MulticasterBatchListener<I, O>();
43
44
45
46
47
48 @SuppressWarnings("unused")
49 private SimpleChunkProcessor() {
50 this(null, null);
51 }
52
53 public SimpleChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) {
54 this.itemProcessor = itemProcessor;
55 this.itemWriter = itemWriter;
56 }
57
58
59
60
61 public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) {
62 this.itemProcessor = itemProcessor;
63 }
64
65
66
67
68 public void setItemWriter(ItemWriter<? super O> itemWriter) {
69 this.itemWriter = itemWriter;
70 }
71
72
73
74
75
76
77 @Override
78 public void afterPropertiesSet() throws Exception {
79 Assert.notNull(itemWriter, "ItemWriter must be set");
80 Assert.notNull(itemProcessor, "ItemProcessor must be set");
81 }
82
83
84
85
86
87
88
89 public void setListeners(List<? extends StepListener> listeners) {
90 for (StepListener listener : listeners) {
91 registerListener(listener);
92 }
93 }
94
95
96
97
98
99
100 public void registerListener(StepListener listener) {
101 this.listener.register(listener);
102 }
103
104
105
106
107 protected MulticasterBatchListener<I, O> getListener() {
108 return listener;
109 }
110
111
112
113
114
115
116 protected final O doProcess(I item) throws Exception {
117
118 if (itemProcessor == null) {
119 @SuppressWarnings("unchecked")
120 O result = (O) item;
121 return result;
122 }
123
124 try {
125 listener.beforeProcess(item);
126 O result = itemProcessor.process(item);
127 listener.afterProcess(item, result);
128 return result;
129 }
130 catch (Exception e) {
131 listener.onProcessError(item, e);
132 throw e;
133 }
134
135 }
136
137
138
139
140
141
142
143 protected final void doWrite(List<O> items) throws Exception {
144
145 if (itemWriter == null) {
146 return;
147 }
148
149 try {
150 listener.beforeWrite(items);
151 writeItems(items);
152 doAfterWrite(items);
153 }
154 catch (Exception e) {
155 doOnWriteError(e, items);
156 throw e;
157 }
158
159 }
160
161
162
163
164
165
166 protected final void doAfterWrite(List<O> items) {
167 listener.afterWrite(items);
168 }
169 protected final void doOnWriteError(Exception e, List<O> items) {
170 listener.onWriteError(e, items);
171 }
172
173 protected void writeItems(List<O> items) throws Exception {
174 if (itemWriter != null) {
175 itemWriter.write(items);
176 }
177 }
178
179 @Override
180 public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
181
182
183 initializeUserData(inputs);
184
185
186 if (isComplete(inputs)) {
187 return;
188 }
189
190
191
192 Chunk<O> outputs = transform(contribution, inputs);
193
194
195 contribution.incrementFilterCount(getFilterCount(inputs, outputs));
196
197
198
199 write(contribution, inputs, getAdjustedOutputs(inputs, outputs));
200
201 }
202
203
204
205
206
207
208
209
210
211
212
213
214 protected void initializeUserData(Chunk<I> inputs) {
215 inputs.setUserData(inputs.size());
216 }
217
218
219
220
221
222
223
224
225
226
227
228
229 protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) {
230 return (Integer) inputs.getUserData() - outputs.size();
231 }
232
233
234
235
236
237
238
239
240
241
242 protected boolean isComplete(Chunk<I> inputs) {
243 return inputs.isEmpty();
244 }
245
246
247
248
249
250
251
252
253
254
255
256
257 protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) {
258 return outputs;
259 }
260
261
262
263
264
265
266
267
268
269
270
271
272 protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception {
273 try {
274 doWrite(outputs.getItems());
275 }
276 catch (Exception e) {
277
278
279
280
281 inputs.clear();
282 throw e;
283 }
284 contribution.incrementWriteCount(outputs.size());
285 }
286
287 protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
288 Chunk<O> outputs = new Chunk<O>();
289 for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
290 final I item = iterator.next();
291 O output;
292 try {
293 output = doProcess(item);
294 }
295 catch (Exception e) {
296
297
298
299
300 inputs.clear();
301 throw e;
302 }
303 if (output != null) {
304 outputs.add(output);
305 }
306 else {
307 iterator.remove();
308 }
309 }
310 return outputs;
311 }
312
313 }