1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.repeat.support;
18
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.List;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.springframework.batch.repeat.CompletionPolicy;
27 import org.springframework.batch.repeat.RepeatCallback;
28 import org.springframework.batch.repeat.RepeatContext;
29 import org.springframework.batch.repeat.RepeatException;
30 import org.springframework.batch.repeat.RepeatListener;
31 import org.springframework.batch.repeat.RepeatOperations;
32 import org.springframework.batch.repeat.RepeatStatus;
33 import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
34 import org.springframework.batch.repeat.exception.ExceptionHandler;
35 import org.springframework.batch.repeat.policy.DefaultResultCompletionPolicy;
36 import org.springframework.util.Assert;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 public class RepeatTemplate implements RepeatOperations {
66
67 protected Log logger = LogFactory.getLog(getClass());
68
69 private RepeatListener[] listeners = new RepeatListener[] {};
70
71 private CompletionPolicy completionPolicy = new DefaultResultCompletionPolicy();
72
73 private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
74
75
76
77
78
79
80
81 public void setListeners(RepeatListener[] listeners) {
82 this.listeners = Arrays.asList(listeners).toArray(new RepeatListener[listeners.length]);
83 }
84
85
86
87
88
89
90 public void registerListener(RepeatListener listener) {
91 List<RepeatListener> list = new ArrayList<RepeatListener>(Arrays.asList(listeners));
92 list.add(listener);
93 listeners = (RepeatListener[]) list.toArray(new RepeatListener[list.size()]);
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107 public void setExceptionHandler(ExceptionHandler exceptionHandler) {
108 this.exceptionHandler = exceptionHandler;
109 }
110
111
112
113
114
115
116
117
118
119
120
121
122
123 public void setCompletionPolicy(CompletionPolicy terminationPolicy) {
124 Assert.notNull(terminationPolicy);
125 this.completionPolicy = terminationPolicy;
126 }
127
128
129
130
131
132
133
134
135 @Override
136 public RepeatStatus iterate(RepeatCallback callback) {
137
138 RepeatContext outer = RepeatSynchronizationManager.getContext();
139
140 RepeatStatus result = RepeatStatus.CONTINUABLE;
141 try {
142
143
144 result = executeInternal(callback);
145 }
146 finally {
147 RepeatSynchronizationManager.clear();
148 if (outer != null) {
149 RepeatSynchronizationManager.register(outer);
150 }
151 }
152
153 return result;
154 }
155
156
157
158
159
160
161
162
163
164
165
166 private RepeatStatus executeInternal(final RepeatCallback callback) {
167
168
169 RepeatContext context = start();
170
171
172
173 boolean running = !isMarkedComplete(context);
174
175 for (int i = 0; i < listeners.length; i++) {
176 RepeatListener interceptor = listeners[i];
177 interceptor.open(context);
178 running = running && !isMarkedComplete(context);
179 if (!running)
180 break;
181 }
182
183
184 RepeatStatus result = RepeatStatus.CONTINUABLE;
185
186 RepeatInternalState state = createInternalState(context);
187
188 Collection<Throwable> throwables = state.getThrowables();
189
190
191 Collection<Throwable> deferred = new ArrayList<Throwable>();
192
193 try {
194
195 while (running) {
196
197
198
199
200
201
202 for (int i = 0; i < listeners.length; i++) {
203 RepeatListener interceptor = listeners[i];
204 interceptor.before(context);
205
206
207 running = running && !isMarkedComplete(context);
208 }
209
210
211 if (running) {
212
213 try {
214
215 result = getNextResult(context, callback, state);
216 executeAfterInterceptors(context, result);
217
218 }
219 catch (Throwable throwable) {
220 doHandle(throwable, context, deferred);
221 }
222
223
224 if (isComplete(context, result) || isMarkedComplete(context) || !deferred.isEmpty()) {
225 running = false;
226 }
227
228 }
229
230 }
231
232 result = result.and(waitForResults(state));
233 for (Throwable throwable : throwables) {
234 doHandle(throwable, context, deferred);
235 }
236
237
238 state = null;
239
240 }
241
242
243
244
245
246 finally {
247
248 try {
249
250 if (!deferred.isEmpty()) {
251 Throwable throwable = (Throwable) deferred.iterator().next();
252 logger.debug("Handling fatal exception explicitly (rethrowing first of " + deferred.size() + "): "
253 + throwable.getClass().getName() + ": " + throwable.getMessage());
254 rethrow(throwable);
255 }
256
257 }
258 finally {
259
260 try {
261 for (int i = listeners.length; i-- > 0;) {
262 RepeatListener interceptor = listeners[i];
263 interceptor.close(context);
264 }
265 }
266 finally {
267 context.close();
268 }
269
270 }
271
272 }
273
274 return result;
275
276 }
277
278 private void doHandle(Throwable throwable, RepeatContext context, Collection<Throwable> deferred) {
279
280
281 Throwable unwrappedThrowable = unwrapIfRethrown(throwable);
282 try {
283
284 for (int i = listeners.length; i-- > 0;) {
285 RepeatListener interceptor = listeners[i];
286
287
288 logger.debug("Exception intercepted (" + (i + 1) + " of " + listeners.length + ")", unwrappedThrowable);
289 interceptor.onError(context, unwrappedThrowable);
290 }
291
292 logger.debug("Handling exception: " + throwable.getClass().getName() + ", caused by: "
293 + unwrappedThrowable.getClass().getName() + ": " + unwrappedThrowable.getMessage());
294 exceptionHandler.handleException(context, unwrappedThrowable);
295
296 }
297 catch (Throwable handled) {
298 deferred.add(handled);
299 }
300 }
301
302
303
304
305
306 private static void rethrow(Throwable throwable) throws RuntimeException {
307 if (throwable instanceof Error) {
308 throw (Error) throwable;
309 }
310 else if (throwable instanceof RuntimeException) {
311 throw (RuntimeException) throwable;
312 }
313 else {
314 throw new RepeatException("Exception in batch process", throwable);
315 }
316 }
317
318
319
320
321
322 private static Throwable unwrapIfRethrown(Throwable throwable) {
323 if (throwable instanceof RepeatException) {
324 return throwable.getCause();
325 }
326 else {
327 return throwable;
328 }
329 }
330
331
332
333
334
335
336
337
338
339
340
341
342
343 protected RepeatInternalState createInternalState(RepeatContext context) {
344 return new RepeatInternalStateSupport();
345 }
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362 protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
363 throws Throwable {
364 update(context);
365 if (logger.isDebugEnabled()) {
366 logger.debug("Repeat operation about to start at count=" + context.getStartedCount());
367 }
368 return callback.doInIteration(context);
369
370 }
371
372
373
374
375
376
377
378
379
380 protected boolean waitForResults(RepeatInternalState state) {
381
382 return true;
383 }
384
385
386
387
388
389
390
391 protected final boolean canContinue(RepeatStatus value) {
392 return ((RepeatStatus) value).isContinuable();
393 }
394
395 private boolean isMarkedComplete(RepeatContext context) {
396 boolean complete = context.isCompleteOnly();
397 if (context.getParent() != null) {
398 complete = complete || isMarkedComplete(context.getParent());
399 }
400 if (complete) {
401 logger.debug("Repeat is complete according to context alone.");
402 }
403 return complete;
404
405 }
406
407
408
409
410
411
412
413 protected void executeAfterInterceptors(final RepeatContext context, RepeatStatus value) {
414
415
416
417
418 if (value != null && value.isContinuable()) {
419 for (int i = listeners.length; i-- > 0;) {
420 RepeatListener interceptor = listeners[i];
421 interceptor.after(context, value);
422 }
423
424 }
425
426 }
427
428
429
430
431
432
433
434 protected boolean isComplete(RepeatContext context, RepeatStatus result) {
435 boolean complete = completionPolicy.isComplete(context, result);
436 if (complete) {
437 logger.debug("Repeat is complete according to policy and result value.");
438 }
439 return complete;
440 }
441
442
443
444
445
446
447 protected boolean isComplete(RepeatContext context) {
448 boolean complete = completionPolicy.isComplete(context);
449 if (complete) {
450 logger.debug("Repeat is complete according to policy alone not including result.");
451 }
452 return complete;
453 }
454
455
456
457
458
459
460 protected RepeatContext start() {
461 RepeatContext parent = RepeatSynchronizationManager.getContext();
462 RepeatContext context = completionPolicy.start(parent);
463 RepeatSynchronizationManager.register(context);
464 logger.debug("Starting repeat context.");
465 return context;
466 }
467
468
469
470
471
472
473 protected void update(RepeatContext context) {
474 completionPolicy.update(context);
475 }
476
477 }