1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.step;
17
18 import java.util.Date;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.springframework.batch.core.BatchStatus;
23 import org.springframework.batch.core.ExitStatus;
24 import org.springframework.batch.core.JobInterruptedException;
25 import org.springframework.batch.core.Step;
26 import org.springframework.batch.core.StepExecution;
27 import org.springframework.batch.core.StepExecutionListener;
28 import org.springframework.batch.core.UnexpectedJobExecutionException;
29 import org.springframework.batch.core.launch.NoSuchJobException;
30 import org.springframework.batch.core.launch.support.ExitCodeMapper;
31 import org.springframework.batch.core.listener.CompositeStepExecutionListener;
32 import org.springframework.batch.core.repository.JobRepository;
33 import org.springframework.batch.core.scope.context.StepSynchronizationManager;
34 import org.springframework.batch.item.ExecutionContext;
35 import org.springframework.batch.repeat.RepeatException;
36 import org.springframework.beans.factory.BeanNameAware;
37 import org.springframework.beans.factory.InitializingBean;
38 import org.springframework.util.Assert;
39 import org.springframework.util.ClassUtils;
40
41
42
43
44
45
46
47
48
49 public abstract class AbstractStep implements Step, InitializingBean, BeanNameAware {
50
51 private static final Log logger = LogFactory.getLog(AbstractStep.class);
52
53 private String name;
54
55 private int startLimit = Integer.MAX_VALUE;
56
57 private boolean allowStartIfComplete = false;
58
59 private CompositeStepExecutionListener stepExecutionListener = new CompositeStepExecutionListener();
60
61 private JobRepository jobRepository;
62
63
64
65
66 public AbstractStep() {
67 super();
68 }
69
70 @Override
71 public void afterPropertiesSet() throws Exception {
72 Assert.state(name != null, "A Step must have a name");
73 Assert.state(jobRepository != null, "JobRepository is mandatory");
74 }
75
76 @Override
77 public String getName() {
78 return this.name;
79 }
80
81
82
83
84
85
86 public void setName(String name) {
87 this.name = name;
88 }
89
90
91
92
93
94
95
96
97 @Override
98 public void setBeanName(String name) {
99 if (this.name == null) {
100 this.name = name;
101 }
102 }
103
104 @Override
105 public int getStartLimit() {
106 return this.startLimit;
107 }
108
109
110
111
112
113
114 public void setStartLimit(int startLimit) {
115 this.startLimit = startLimit;
116 }
117
118 @Override
119 public boolean isAllowStartIfComplete() {
120 return this.allowStartIfComplete;
121 }
122
123
124
125
126
127
128
129 public void setAllowStartIfComplete(boolean allowStartIfComplete) {
130 this.allowStartIfComplete = allowStartIfComplete;
131 }
132
133
134
135
136
137
138 public AbstractStep(String name) {
139 this.name = name;
140 }
141
142
143
144
145
146
147
148
149 protected abstract void doExecute(StepExecution stepExecution) throws Exception;
150
151
152
153
154
155
156
157
158 protected void open(ExecutionContext ctx) throws Exception {
159 }
160
161
162
163
164
165
166
167
168 protected void close(ExecutionContext ctx) throws Exception {
169 }
170
171
172
173
174
175
176 @Override
177 public final void execute(StepExecution stepExecution) throws JobInterruptedException,
178 UnexpectedJobExecutionException {
179
180 logger.debug("Executing: id=" + stepExecution.getId());
181 stepExecution.setStartTime(new Date());
182 stepExecution.setStatus(BatchStatus.STARTED);
183 getJobRepository().update(stepExecution);
184
185
186 ExitStatus exitStatus = ExitStatus.EXECUTING;
187
188 StepSynchronizationManager.register(stepExecution);
189
190 try {
191 getCompositeListener().beforeStep(stepExecution);
192 open(stepExecution.getExecutionContext());
193
194 try {
195 doExecute(stepExecution);
196 }
197 catch (RepeatException e) {
198 throw e.getCause();
199 }
200 exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus());
201
202
203 if (stepExecution.isTerminateOnly()) {
204 throw new JobInterruptedException("JobExecution interrupted.");
205 }
206
207
208 stepExecution.upgradeStatus(BatchStatus.COMPLETED);
209 logger.debug("Step execution success: id=" + stepExecution.getId());
210 }
211 catch (Throwable e) {
212 stepExecution.upgradeStatus(determineBatchStatus(e));
213 exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e));
214 stepExecution.addFailureException(e);
215 if (stepExecution.getStatus() == BatchStatus.STOPPED) {
216 logger.info("Encountered interruption executing step: " + e.getMessage());
217 if (logger.isDebugEnabled()) {
218 logger.debug("Full exception", e);
219 }
220 }
221 else {
222 logger.error("Encountered an error executing the step", e);
223 }
224 }
225 finally {
226
227 try {
228
229
230 exitStatus = exitStatus.and(stepExecution.getExitStatus());
231 stepExecution.setExitStatus(exitStatus);
232 exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution));
233 }
234 catch (Exception e) {
235 logger.error("Exception in afterStep callback", e);
236 }
237
238 try {
239 getJobRepository().updateExecutionContext(stepExecution);
240 }
241 catch (Exception e) {
242 stepExecution.setStatus(BatchStatus.UNKNOWN);
243 exitStatus = exitStatus.and(ExitStatus.UNKNOWN);
244 stepExecution.addFailureException(e);
245 logger.error("Encountered an error saving batch meta data. "
246 + "This job is now in an unknown state and should not be restarted.", e);
247 }
248
249 stepExecution.setEndTime(new Date());
250 stepExecution.setExitStatus(exitStatus);
251
252 try {
253 getJobRepository().update(stepExecution);
254 }
255 catch (Exception e) {
256 stepExecution.setStatus(BatchStatus.UNKNOWN);
257 stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN));
258 stepExecution.addFailureException(e);
259 logger.error("Encountered an error saving batch meta data. "
260 + "This job is now in an unknown state and should not be restarted.", e);
261 }
262
263 try {
264 close(stepExecution.getExecutionContext());
265 }
266 catch (Exception e) {
267 logger.error("Exception while closing step execution resources", e);
268 stepExecution.addFailureException(e);
269 }
270
271 StepSynchronizationManager.release();
272
273 logger.debug("Step execution complete: " + stepExecution.getSummary());
274 }
275 }
276
277
278
279
280 private static BatchStatus determineBatchStatus(Throwable e) {
281 if (e instanceof JobInterruptedException || e.getCause() instanceof JobInterruptedException) {
282 return BatchStatus.STOPPED;
283 }
284 else {
285 return BatchStatus.FAILED;
286 }
287 }
288
289
290
291
292
293
294 public void registerStepExecutionListener(StepExecutionListener listener) {
295 this.stepExecutionListener.register(listener);
296 }
297
298
299
300
301
302
303 public void setStepExecutionListeners(StepExecutionListener[] listeners) {
304 for (int i = 0; i < listeners.length; i++) {
305 registerStepExecutionListener(listeners[i]);
306 }
307 }
308
309
310
311
312 protected StepExecutionListener getCompositeListener() {
313 return stepExecutionListener;
314 }
315
316
317
318
319
320
321 public void setJobRepository(JobRepository jobRepository) {
322 this.jobRepository = jobRepository;
323 }
324
325 protected JobRepository getJobRepository() {
326 return jobRepository;
327 }
328
329 @Override
330 public String toString() {
331 return ClassUtils.getShortName(getClass()) + ": [name=" + name + "]";
332 }
333
334
335
336
337
338
339
340
341 private ExitStatus getDefaultExitStatusForFailure(Throwable ex) {
342 ExitStatus exitStatus;
343 if (ex instanceof JobInterruptedException || ex.getCause() instanceof JobInterruptedException) {
344 exitStatus = ExitStatus.STOPPED.addExitDescription(JobInterruptedException.class.getName());
345 }
346 else if (ex instanceof NoSuchJobException || ex.getCause() instanceof NoSuchJobException) {
347 exitStatus = new ExitStatus(ExitCodeMapper.NO_SUCH_JOB, ex.getClass().getName());
348 }
349 else {
350 exitStatus = ExitStatus.FAILED.addExitDescription(ex);
351 }
352
353 return exitStatus;
354 }
355
356 }