1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.core.job;
18
19 import org.apache.commons.logging.Log;
20 import org.apache.commons.logging.LogFactory;
21 import org.springframework.batch.core.BatchStatus;
22 import org.springframework.batch.core.JobExecution;
23 import org.springframework.batch.core.JobInstance;
24 import org.springframework.batch.core.JobInterruptedException;
25 import org.springframework.batch.core.StartLimitExceededException;
26 import org.springframework.batch.core.Step;
27 import org.springframework.batch.core.StepExecution;
28 import org.springframework.batch.core.repository.JobRepository;
29 import org.springframework.batch.core.repository.JobRestartException;
30 import org.springframework.batch.item.ExecutionContext;
31 import org.springframework.beans.factory.InitializingBean;
32 import org.springframework.util.Assert;
33
34
35
36
37
38
39
40
41 public class SimpleStepHandler implements StepHandler, InitializingBean {
42
43 private static final Log logger = LogFactory.getLog(SimpleStepHandler.class);
44
45 private JobRepository jobRepository;
46
47 private ExecutionContext executionContext;
48
49
50
51
52 public SimpleStepHandler() {
53 this(null);
54 }
55
56
57
58
59 public SimpleStepHandler(JobRepository jobRepository) {
60 this(jobRepository, new ExecutionContext());
61 }
62
63
64
65
66
67 public SimpleStepHandler(JobRepository jobRepository, ExecutionContext executionContext) {
68 this.jobRepository = jobRepository;
69 this.executionContext = executionContext;
70 }
71
72
73
74
75
76
77 @Override
78 public void afterPropertiesSet() throws Exception {
79 Assert.state(jobRepository != null, "A JobRepository must be provided");
80 }
81
82
83
84
85 public void setJobRepository(JobRepository jobRepository) {
86 this.jobRepository = jobRepository;
87 }
88
89
90
91
92
93
94
95 public void setExecutionContext(ExecutionContext executionContext) {
96 this.executionContext = executionContext;
97 }
98
99 @Override
100 public StepExecution handleStep(Step step, JobExecution execution) throws JobInterruptedException,
101 JobRestartException, StartLimitExceededException {
102 if (execution.isStopping()) {
103 throw new JobInterruptedException("JobExecution interrupted.");
104 }
105
106 JobInstance jobInstance = execution.getJobInstance();
107
108 StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, step.getName());
109 if (stepExecutionPartOfExistingJobExecution(execution, lastStepExecution)) {
110
111
112 logger.info(String.format("Duplicate step [%s] detected in execution of job=[%s]. "
113 + "If either step fails, both will be executed again on restart.", step.getName(), jobInstance
114 .getJobName()));
115 lastStepExecution = null;
116 }
117 StepExecution currentStepExecution = lastStepExecution;
118
119 if (shouldStart(lastStepExecution, jobInstance, step)) {
120
121 currentStepExecution = execution.createStepExecution(step.getName());
122
123 boolean isRestart = (lastStepExecution != null && !lastStepExecution.getStatus().equals(
124 BatchStatus.COMPLETED));
125
126 if (isRestart) {
127 currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext());
128 }
129 else {
130 currentStepExecution.setExecutionContext(new ExecutionContext(executionContext));
131 }
132
133 jobRepository.add(currentStepExecution);
134
135 logger.info("Executing step: [" + step.getName() + "]");
136 try {
137 step.execute(currentStepExecution);
138 }
139 catch (JobInterruptedException e) {
140
141
142
143 execution.setStatus(BatchStatus.STOPPING);
144 throw e;
145 }
146
147 jobRepository.updateExecutionContext(execution);
148
149 if (currentStepExecution.getStatus() == BatchStatus.STOPPING
150 || currentStepExecution.getStatus() == BatchStatus.STOPPED) {
151
152 execution.setStatus(BatchStatus.STOPPING);
153 throw new JobInterruptedException("Job interrupted by step execution");
154 }
155
156 }
157 else {
158
159 }
160
161 return currentStepExecution;
162 }
163
164
165
166
167
168
169
170 private boolean stepExecutionPartOfExistingJobExecution(JobExecution jobExecution, StepExecution stepExecution) {
171 return stepExecution != null && stepExecution.getJobExecutionId() != null
172 && stepExecution.getJobExecutionId().equals(jobExecution.getId());
173 }
174
175
176
177
178
179
180
181
182
183
184
185
186
187 private boolean shouldStart(StepExecution lastStepExecution, JobInstance jobInstance, Step step)
188 throws JobRestartException, StartLimitExceededException {
189
190 BatchStatus stepStatus;
191 if (lastStepExecution == null) {
192 stepStatus = BatchStatus.STARTING;
193 }
194 else {
195 stepStatus = lastStepExecution.getStatus();
196 }
197
198 if (stepStatus == BatchStatus.UNKNOWN) {
199 throw new JobRestartException("Cannot restart step from UNKNOWN status. "
200 + "The last execution ended with a failure that could not be rolled back, "
201 + "so it may be dangerous to proceed. Manual intervention is probably necessary.");
202 }
203
204 if ((stepStatus == BatchStatus.COMPLETED && step.isAllowStartIfComplete() == false)
205 || stepStatus == BatchStatus.ABANDONED) {
206
207
208 logger.info("Step already complete or not restartable, so no action to execute: " + lastStepExecution);
209 return false;
210 }
211
212 if (jobRepository.getStepExecutionCount(jobInstance, step.getName()) < step.getStartLimit()) {
213
214 return true;
215 }
216 else {
217
218 throw new StartLimitExceededException("Maximum start limit exceeded for step: " + step.getName()
219 + "StartMax: " + step.getStartLimit());
220 }
221 }
222
223 }