1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.core.repository.support;
18
19 import java.util.ArrayList;
20 import java.util.Date;
21 import java.util.List;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.springframework.batch.core.BatchStatus;
26 import org.springframework.batch.core.JobExecution;
27 import org.springframework.batch.core.JobInstance;
28 import org.springframework.batch.core.JobParameters;
29 import org.springframework.batch.core.StepExecution;
30 import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
31 import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
32 import org.springframework.batch.core.repository.JobRepository;
33 import org.springframework.batch.core.repository.JobRestartException;
34 import org.springframework.batch.core.repository.dao.ExecutionContextDao;
35 import org.springframework.batch.core.repository.dao.JobExecutionDao;
36 import org.springframework.batch.core.repository.dao.JobInstanceDao;
37 import org.springframework.batch.core.repository.dao.StepExecutionDao;
38 import org.springframework.batch.item.ExecutionContext;
39 import org.springframework.util.Assert;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public class SimpleJobRepository implements JobRepository {
59
60 private static final Log logger = LogFactory.getLog(SimpleJobRepository.class);
61
62 private JobInstanceDao jobInstanceDao;
63
64 private JobExecutionDao jobExecutionDao;
65
66 private StepExecutionDao stepExecutionDao;
67
68 private ExecutionContextDao ecDao;
69
70
71
72
73
74 SimpleJobRepository() {
75 }
76
77 public SimpleJobRepository(JobInstanceDao jobInstanceDao, JobExecutionDao jobExecutionDao,
78 StepExecutionDao stepExecutionDao, ExecutionContextDao ecDao) {
79 super();
80 this.jobInstanceDao = jobInstanceDao;
81 this.jobExecutionDao = jobExecutionDao;
82 this.stepExecutionDao = stepExecutionDao;
83 this.ecDao = ecDao;
84 }
85
86 @Override
87 public boolean isJobInstanceExists(String jobName, JobParameters jobParameters) {
88 return jobInstanceDao.getJobInstance(jobName, jobParameters) != null;
89 }
90
91 @Override
92 public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
93 throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
94
95 Assert.notNull(jobName, "Job name must not be null.");
96 Assert.notNull(jobParameters, "JobParameters must not be null.");
97
98
99
100
101
102
103
104
105
106
107 JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
108 ExecutionContext executionContext;
109
110
111 if (jobInstance != null) {
112
113 List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance);
114
115
116 for (JobExecution execution : executions) {
117 if (execution.isRunning()) {
118 throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
119 + jobInstance);
120 }
121
122 BatchStatus status = execution.getStatus();
123 if (status == BatchStatus.COMPLETED || status == BatchStatus.ABANDONED) {
124 throw new JobInstanceAlreadyCompleteException(
125 "A job instance already exists and is complete for parameters=" + jobParameters
126 + ". If you want to run this job again, change the parameters.");
127 }
128 }
129 executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance));
130 }
131 else {
132
133 jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
134 executionContext = new ExecutionContext();
135 }
136
137 JobExecution jobExecution = new JobExecution(jobInstance);
138 jobExecution.setExecutionContext(executionContext);
139 jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));
140
141
142
143 jobExecutionDao.saveJobExecution(jobExecution);
144 ecDao.saveExecutionContext(jobExecution);
145
146 return jobExecution;
147
148 }
149
150 @Override
151 public void update(JobExecution jobExecution) {
152
153 Assert.notNull(jobExecution, "JobExecution cannot be null.");
154 Assert.notNull(jobExecution.getJobId(), "JobExecution must have a Job ID set.");
155 Assert.notNull(jobExecution.getId(), "JobExecution must be already saved (have an id assigned).");
156
157 jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));
158 jobExecutionDao.updateJobExecution(jobExecution);
159 }
160
161 @Override
162 public void add(StepExecution stepExecution) {
163 validateStepExecution(stepExecution);
164
165 stepExecution.setLastUpdated(new Date(System.currentTimeMillis()));
166 stepExecutionDao.saveStepExecution(stepExecution);
167 ecDao.saveExecutionContext(stepExecution);
168 }
169
170 @Override
171 public void update(StepExecution stepExecution) {
172 validateStepExecution(stepExecution);
173 Assert.notNull(stepExecution.getId(), "StepExecution must already be saved (have an id assigned)");
174
175 stepExecution.setLastUpdated(new Date(System.currentTimeMillis()));
176 stepExecutionDao.updateStepExecution(stepExecution);
177 checkForInterruption(stepExecution);
178 }
179
180 private void validateStepExecution(StepExecution stepExecution) {
181 Assert.notNull(stepExecution, "StepExecution cannot be null.");
182 Assert.notNull(stepExecution.getStepName(), "StepExecution's step name cannot be null.");
183 Assert.notNull(stepExecution.getJobExecutionId(), "StepExecution must belong to persisted JobExecution");
184 }
185
186 @Override
187 public void updateExecutionContext(StepExecution stepExecution) {
188 validateStepExecution(stepExecution);
189 Assert.notNull(stepExecution.getId(), "StepExecution must already be saved (have an id assigned)");
190 ecDao.updateExecutionContext(stepExecution);
191 }
192
193 @Override
194 public void updateExecutionContext(JobExecution jobExecution) {
195 ecDao.updateExecutionContext(jobExecution);
196 }
197
198 @Override
199 public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
200 List<JobExecution> jobExecutions = jobExecutionDao.findJobExecutions(jobInstance);
201 List<StepExecution> stepExecutions = new ArrayList<StepExecution>(jobExecutions.size());
202 for (JobExecution jobExecution : jobExecutions) {
203 stepExecutionDao.addStepExecutions(jobExecution);
204 for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
205 if (stepName.equals(stepExecution.getStepName())) {
206 stepExecutions.add(stepExecution);
207 }
208 }
209 }
210 StepExecution latest = null;
211 for (StepExecution stepExecution : stepExecutions) {
212 if (latest == null) {
213 latest = stepExecution;
214 }
215 if (latest.getStartTime().getTime() < stepExecution.getStartTime().getTime()) {
216 latest = stepExecution;
217 }
218 }
219 if (latest != null) {
220 ExecutionContext executionContext = ecDao.getExecutionContext(latest);
221 latest.setExecutionContext(executionContext);
222 }
223 return latest;
224 }
225
226
227
228
229 @Override
230 public int getStepExecutionCount(JobInstance jobInstance, String stepName) {
231 int count = 0;
232 List<JobExecution> jobExecutions = jobExecutionDao.findJobExecutions(jobInstance);
233 for (JobExecution jobExecution : jobExecutions) {
234 stepExecutionDao.addStepExecutions(jobExecution);
235 for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
236 if (stepName.equals(stepExecution.getStepName())) {
237 count++;
238 }
239 }
240 }
241 return count;
242 }
243
244
245
246
247
248
249
250
251
252 private void checkForInterruption(StepExecution stepExecution) {
253 JobExecution jobExecution = stepExecution.getJobExecution();
254 jobExecutionDao.synchronizeStatus(jobExecution);
255 if (jobExecution.isStopping()) {
256 logger.info("Parent JobExecution is stopped, so passing message on to StepExecution");
257 stepExecution.setTerminateOnly();
258 }
259 }
260
261 @Override
262 public JobExecution getLastJobExecution(String jobName, JobParameters jobParameters) {
263 JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
264 if (jobInstance == null) {
265 return null;
266 }
267 JobExecution jobExecution = jobExecutionDao.getLastJobExecution(jobInstance);
268
269 if (jobExecution != null) {
270 jobExecution.setExecutionContext(ecDao.getExecutionContext(jobExecution));
271 }
272 return jobExecution;
273
274 }
275
276
277 }