1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.listener;
17
18 import java.util.List;
19
20 import org.springframework.batch.core.ChunkListener;
21 import org.springframework.batch.core.ExitStatus;
22 import org.springframework.batch.core.ItemProcessListener;
23 import org.springframework.batch.core.ItemReadListener;
24 import org.springframework.batch.core.ItemWriteListener;
25 import org.springframework.batch.core.SkipListener;
26 import org.springframework.batch.core.StepExecution;
27 import org.springframework.batch.core.StepExecutionListener;
28 import org.springframework.batch.core.StepListener;
29 import org.springframework.batch.core.scope.context.ChunkContext;
30 import org.springframework.batch.item.ItemStream;
31
32
33
34
35
36 public class MulticasterBatchListener<T, S> implements StepExecutionListener, ChunkListener, ItemReadListener<T>,
37 ItemProcessListener<T, S>, ItemWriteListener<S>, SkipListener<T, S> {
38
39 private CompositeStepExecutionListener stepListener = new CompositeStepExecutionListener();
40
41 private CompositeChunkListener chunkListener = new CompositeChunkListener();
42
43 private CompositeItemReadListener<T> itemReadListener = new CompositeItemReadListener<T>();
44
45 private CompositeItemProcessListener<T, S> itemProcessListener = new CompositeItemProcessListener<T, S>();
46
47 private CompositeItemWriteListener<S> itemWriteListener = new CompositeItemWriteListener<S>();
48
49 private CompositeSkipListener<T, S> skipListener = new CompositeSkipListener<T, S>();
50
51
52
53
54 public MulticasterBatchListener() {
55 super();
56 }
57
58
59
60
61
62
63
64 public void setListeners(List<? extends StepListener> listeners) {
65 for (StepListener stepListener : listeners) {
66 register(stepListener);
67 }
68 }
69
70
71
72
73
74
75 public void register(StepListener listener) {
76 if (listener instanceof StepExecutionListener) {
77 this.stepListener.register((StepExecutionListener) listener);
78 }
79 if (listener instanceof ChunkListener) {
80 this.chunkListener.register((ChunkListener) listener);
81 }
82 if (listener instanceof ItemReadListener<?>) {
83 @SuppressWarnings("unchecked")
84 ItemReadListener<T> itemReadListener = (ItemReadListener<T>) listener;
85 this.itemReadListener.register(itemReadListener);
86 }
87 if (listener instanceof ItemProcessListener<?, ?>) {
88 @SuppressWarnings("unchecked")
89 ItemProcessListener<T, S> itemProcessListener = (ItemProcessListener<T, S>) listener;
90 this.itemProcessListener.register(itemProcessListener);
91 }
92 if (listener instanceof ItemWriteListener<?>) {
93 @SuppressWarnings("unchecked")
94 ItemWriteListener<S> itemWriteListener = (ItemWriteListener<S>) listener;
95 this.itemWriteListener.register(itemWriteListener);
96 }
97 if (listener instanceof SkipListener<?, ?>) {
98 @SuppressWarnings("unchecked")
99 SkipListener<T, S> skipListener = (SkipListener<T, S>) listener;
100 this.skipListener.register(skipListener);
101 }
102 }
103
104
105
106
107
108
109
110 @Override
111 public void afterProcess(T item, S result) {
112 try {
113 itemProcessListener.afterProcess(item, result);
114 }
115 catch (RuntimeException e) {
116 throw new StepListenerFailedException("Error in afterProcess.", e);
117 }
118 }
119
120
121
122
123
124 @Override
125 public void beforeProcess(T item) {
126 try {
127 itemProcessListener.beforeProcess(item);
128 }
129 catch (RuntimeException e) {
130 throw new StepListenerFailedException("Error in beforeProcess.", e);
131 }
132 }
133
134
135
136
137
138
139
140 @Override
141 public void onProcessError(T item, Exception ex) {
142 try {
143 itemProcessListener.onProcessError(item, ex);
144 }
145 catch (RuntimeException e) {
146 throw new StepListenerFailedException("Error in onProcessError.", e);
147 }
148 }
149
150
151
152
153 @Override
154 public ExitStatus afterStep(StepExecution stepExecution) {
155 try {
156 return stepListener.afterStep(stepExecution);
157 }
158 catch (RuntimeException e) {
159 throw new StepListenerFailedException("Error in afterStep.", e);
160 }
161 }
162
163
164
165
166
167 @Override
168 public void beforeStep(StepExecution stepExecution) {
169 try {
170 stepListener.beforeStep(stepExecution);
171 }
172 catch (RuntimeException e) {
173 throw new StepListenerFailedException("Error in beforeStep.", e);
174 }
175 }
176
177
178
179
180
181 @Override
182 public void afterChunk(ChunkContext context) {
183 try {
184 chunkListener.afterChunk(context);
185 }
186 catch (RuntimeException e) {
187 throw new StepListenerFailedException("Error in afterChunk.", e);
188 }
189 }
190
191
192
193
194
195 @Override
196 public void beforeChunk(ChunkContext context) {
197 try {
198 chunkListener.beforeChunk(context);
199 }
200 catch (RuntimeException e) {
201 throw new StepListenerFailedException("Error in beforeChunk.", e);
202 }
203 }
204
205
206
207
208
209 @Override
210 public void afterRead(T item) {
211 try {
212 itemReadListener.afterRead(item);
213 }
214 catch (RuntimeException e) {
215 throw new StepListenerFailedException("Error in afterRead.", e);
216 }
217 }
218
219
220
221
222
223 @Override
224 public void beforeRead() {
225 try {
226 itemReadListener.beforeRead();
227 }
228 catch (RuntimeException e) {
229 throw new StepListenerFailedException("Error in beforeRead.", e);
230 }
231 }
232
233
234
235
236
237 @Override
238 public void onReadError(Exception ex) {
239 try {
240 itemReadListener.onReadError(ex);
241 }
242 catch (RuntimeException e) {
243 throw new StepListenerFailedException("Error in onReadError.", e);
244 }
245 }
246
247
248
249
250
251 @Override
252 public void afterWrite(List<? extends S> items) {
253 try {
254 itemWriteListener.afterWrite(items);
255 }
256 catch (RuntimeException e) {
257 throw new StepListenerFailedException("Error in afterWrite.", e);
258 }
259 }
260
261
262
263
264
265 @Override
266 public void beforeWrite(List<? extends S> items) {
267 try {
268 itemWriteListener.beforeWrite(items);
269 }
270 catch (RuntimeException e) {
271 throw new StepListenerFailedException("Error in beforeWrite.", e);
272 }
273 }
274
275
276
277
278
279
280 @Override
281 public void onWriteError(Exception ex, List<? extends S> items) {
282 try {
283 itemWriteListener.onWriteError(ex, items);
284 }
285 catch (RuntimeException e) {
286 throw new StepListenerFailedException("Error in onWriteError.", e);
287 }
288 }
289
290
291
292
293
294 @Override
295 public void onSkipInRead(Throwable t) {
296 skipListener.onSkipInRead(t);
297 }
298
299
300
301
302
303
304
305 @Override
306 public void onSkipInWrite(S item, Throwable t) {
307 skipListener.onSkipInWrite(item, t);
308 }
309
310
311
312
313
314
315
316 @Override
317 public void onSkipInProcess(T item, Throwable t) {
318 skipListener.onSkipInProcess(item, t);
319 }
320
321 @Override
322 public void afterChunkError(ChunkContext context) {
323 try {
324 chunkListener.afterChunkError(context);
325 }
326 catch (RuntimeException e) {
327 throw new StepListenerFailedException("Error in afterFailedChunk.", e);
328 }
329 }
330 }