1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.step.tasklet;
17
18 import java.util.concurrent.Semaphore;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.springframework.batch.core.BatchStatus;
23 import org.springframework.batch.core.ChunkListener;
24 import org.springframework.batch.core.JobInterruptedException;
25 import org.springframework.batch.core.StepContribution;
26 import org.springframework.batch.core.StepExecution;
27 import org.springframework.batch.core.StepExecutionListener;
28 import org.springframework.batch.core.listener.CompositeChunkListener;
29 import org.springframework.batch.core.repository.JobRepository;
30 import org.springframework.batch.core.scope.context.ChunkContext;
31 import org.springframework.batch.core.scope.context.StepContextRepeatCallback;
32 import org.springframework.batch.core.step.AbstractStep;
33 import org.springframework.batch.core.step.FatalStepExecutionException;
34 import org.springframework.batch.core.step.StepInterruptionPolicy;
35 import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
36 import org.springframework.batch.item.ExecutionContext;
37 import org.springframework.batch.item.ItemReader;
38 import org.springframework.batch.item.ItemStream;
39 import org.springframework.batch.item.ItemWriter;
40 import org.springframework.batch.item.support.CompositeItemStream;
41 import org.springframework.batch.repeat.RepeatContext;
42 import org.springframework.batch.repeat.RepeatOperations;
43 import org.springframework.batch.repeat.RepeatStatus;
44 import org.springframework.batch.repeat.support.RepeatTemplate;
45 import org.springframework.transaction.PlatformTransactionManager;
46 import org.springframework.transaction.TransactionStatus;
47 import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
48 import org.springframework.transaction.interceptor.TransactionAttribute;
49 import org.springframework.transaction.support.TransactionCallback;
50 import org.springframework.transaction.support.TransactionSynchronization;
51 import org.springframework.transaction.support.TransactionSynchronizationAdapter;
52 import org.springframework.transaction.support.TransactionSynchronizationManager;
53 import org.springframework.transaction.support.TransactionTemplate;
54 import org.springframework.util.Assert;
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 @SuppressWarnings("serial")
76 public class TaskletStep extends AbstractStep {
77
78 private static final Log logger = LogFactory.getLog(TaskletStep.class);
79
80 private RepeatOperations stepOperations = new RepeatTemplate();
81
82 private CompositeChunkListener chunkListener = new CompositeChunkListener();
83
84
85 private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy();
86
87 private CompositeItemStream stream = new CompositeItemStream();
88
89 private PlatformTransactionManager transactionManager;
90
91 private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() {
92
93 @Override
94 public boolean rollbackOn(Throwable ex) {
95 return true;
96 }
97
98 };
99
100 private Tasklet tasklet;
101
102
103
104
105 public TaskletStep() {
106 this(null);
107 }
108
109
110
111
112 public TaskletStep(String name) {
113 super(name);
114 }
115
116
117
118
119
120
121
122 @Override
123 public void afterPropertiesSet() throws Exception {
124 super.afterPropertiesSet();
125 Assert.state(transactionManager != null, "A transaction manager must be provided");
126 }
127
128
129
130
131
132
133 public void setTransactionManager(PlatformTransactionManager transactionManager) {
134 this.transactionManager = transactionManager;
135 }
136
137
138
139
140
141
142 public void setTransactionAttribute(TransactionAttribute transactionAttribute) {
143 this.transactionAttribute = transactionAttribute;
144 }
145
146
147
148
149
150
151 public void setTasklet(Tasklet tasklet) {
152 this.tasklet = tasklet;
153 if (tasklet instanceof StepExecutionListener) {
154 registerStepExecutionListener((StepExecutionListener) tasklet);
155 }
156 }
157
158
159
160
161
162
163
164 public void registerChunkListener(ChunkListener listener) {
165 this.chunkListener.register(listener);
166 }
167
168
169
170
171
172
173 public void setChunkListeners(ChunkListener[] listeners) {
174 for (int i = 0; i < listeners.length; i++) {
175 registerChunkListener(listeners[i]);
176 }
177 }
178
179
180
181
182
183
184
185
186
187
188
189
190 public void setStreams(ItemStream[] streams) {
191 for (int i = 0; i < streams.length; i++) {
192 registerStream(streams[i]);
193 }
194 }
195
196
197
198
199
200
201
202 public void registerStream(ItemStream stream) {
203 this.stream.register(stream);
204 }
205
206
207
208
209
210
211
212
213 public void setStepOperations(RepeatOperations stepOperations) {
214 this.stepOperations = stepOperations;
215 }
216
217
218
219
220
221
222
223
224 public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) {
225 this.interruptionPolicy = interruptionPolicy;
226 }
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242 @Override
243 @SuppressWarnings("unchecked")
244 protected void doExecute(StepExecution stepExecution) throws Exception {
245
246 stream.update(stepExecution.getExecutionContext());
247 getJobRepository().updateExecutionContext(stepExecution);
248
249
250
251 final Semaphore semaphore = createSemaphore();
252
253 stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
254
255 @Override
256 public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
257 throws Exception {
258
259 StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
260
261
262
263 interruptionPolicy.checkInterrupted(stepExecution);
264
265 RepeatStatus result;
266 try {
267 result = (RepeatStatus) new TransactionTemplate(transactionManager, transactionAttribute)
268 .execute(new ChunkTransactionCallback(chunkContext, semaphore));
269 }
270 catch (UncheckedTransactionException e) {
271
272 throw (Exception) e.getCause();
273 }
274
275 chunkListener.afterChunk(chunkContext);
276
277
278
279
280 interruptionPolicy.checkInterrupted(stepExecution);
281
282 return result;
283 }
284
285 });
286
287 }
288
289
290
291
292
293
294
295 protected Semaphore createSemaphore() {
296 return new Semaphore(1);
297 }
298
299 @Override
300 protected void close(ExecutionContext ctx) throws Exception {
301 stream.close();
302 }
303
304 @Override
305 protected void open(ExecutionContext ctx) throws Exception {
306 stream.open(ctx);
307 }
308
309
310
311
312
313
314
315
316
317
318 @SuppressWarnings("rawtypes")
319 private class ChunkTransactionCallback extends TransactionSynchronizationAdapter implements TransactionCallback {
320
321 private final StepExecution stepExecution;
322
323 private final ChunkContext chunkContext;
324
325 private boolean rolledBack = false;
326
327 private boolean stepExecutionUpdated = false;
328
329 private StepExecution oldVersion;
330
331 private boolean locked = false;
332
333 private final Semaphore semaphore;
334
335 public ChunkTransactionCallback(ChunkContext chunkContext, Semaphore semaphore) {
336 this.chunkContext = chunkContext;
337 this.stepExecution = chunkContext.getStepContext().getStepExecution();
338 this.semaphore = semaphore;
339 }
340
341 @Override
342 public void afterCompletion(int status) {
343 try {
344 if (status != TransactionSynchronization.STATUS_COMMITTED) {
345 if (stepExecutionUpdated) {
346
347
348 logger.info("Commit failed while step execution data was already updated. "
349 + "Reverting to old version.");
350 copy(oldVersion, stepExecution);
351 if (status == TransactionSynchronization.STATUS_ROLLED_BACK) {
352 rollback(stepExecution);
353 }
354 }
355 chunkListener.afterChunkError(chunkContext);
356 }
357 if (status == TransactionSynchronization.STATUS_UNKNOWN) {
358 logger.error("Rolling back with transaction in unknown state");
359 rollback(stepExecution);
360 stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
361 stepExecution.setTerminateOnly();
362 }
363 }
364 finally {
365
366
367 if (locked) {
368 semaphore.release();
369 }
370 locked = false;
371 }
372 }
373
374 @Override
375 public Object doInTransaction(TransactionStatus status) {
376
377 TransactionSynchronizationManager.registerSynchronization(this);
378
379 RepeatStatus result = RepeatStatus.CONTINUABLE;
380
381 StepContribution contribution = stepExecution.createStepContribution();
382
383 chunkListener.beforeChunk(chunkContext);
384
385
386
387 oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
388 copy(stepExecution, oldVersion);
389
390 try {
391
392 try {
393 try {
394 result = tasklet.execute(contribution, chunkContext);
395 if (result == null) {
396 result = RepeatStatus.FINISHED;
397 }
398 }
399 catch (Exception e) {
400 if (transactionAttribute.rollbackOn(e)) {
401 chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e);
402 throw e;
403 }
404 }
405 }
406 finally {
407
408
409
410
411
412 try {
413 semaphore.acquire();
414 locked = true;
415 }
416 catch (InterruptedException e) {
417 logger.error("Thread interrupted while locking for repository update");
418 stepExecution.setStatus(BatchStatus.STOPPED);
419 stepExecution.setTerminateOnly();
420 Thread.currentThread().interrupt();
421 }
422
423
424
425 logger.debug("Applying contribution: " + contribution);
426 stepExecution.apply(contribution);
427
428 }
429
430 stepExecutionUpdated = true;
431
432 stream.update(stepExecution.getExecutionContext());
433
434 try {
435
436
437 getJobRepository().updateExecutionContext(stepExecution);
438 stepExecution.incrementCommitCount();
439 logger.debug("Saving step execution before commit: " + stepExecution);
440 getJobRepository().update(stepExecution);
441 }
442 catch (Exception e) {
443
444
445 String msg = "JobRepository failure forcing exit with unknown status";
446 logger.error(msg, e);
447 stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
448 stepExecution.setTerminateOnly();
449 throw new FatalStepExecutionException(msg, e);
450 }
451
452 }
453 catch (Error e) {
454 logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage());
455 rollback(stepExecution);
456 throw e;
457 }
458 catch (RuntimeException e) {
459 logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage());
460 rollback(stepExecution);
461 throw e;
462 }
463 catch (Exception e) {
464 logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage());
465 rollback(stepExecution);
466
467 throw new UncheckedTransactionException(e);
468 }
469
470 return result;
471
472 }
473
474 private void rollback(StepExecution stepExecution) {
475 if (!rolledBack) {
476 stepExecution.incrementRollbackCount();
477 rolledBack = true;
478 }
479 }
480
481 private void copy(final StepExecution source, final StepExecution target) {
482 target.setVersion(source.getVersion());
483 target.setWriteCount(source.getWriteCount());
484 target.setFilterCount(source.getFilterCount());
485 target.setCommitCount(source.getCommitCount());
486 target.setExecutionContext(new ExecutionContext(source.getExecutionContext()));
487 }
488
489 }
490
491
492
493
494
495
496
497
498 private static class UncheckedTransactionException extends RuntimeException {
499
500 public UncheckedTransactionException(Exception e) {
501 super(e);
502 }
503
504 }
505
506 }