1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.core.repository.dao;
18
19 import java.sql.ResultSet;
20 import java.sql.SQLException;
21 import java.sql.Types;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Set;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.springframework.batch.core.BatchStatus;
29 import org.springframework.batch.core.ExitStatus;
30 import org.springframework.batch.core.JobExecution;
31 import org.springframework.batch.core.JobInstance;
32 import org.springframework.beans.factory.InitializingBean;
33 import org.springframework.dao.EmptyResultDataAccessException;
34 import org.springframework.dao.OptimisticLockingFailureException;
35 import org.springframework.jdbc.core.RowCallbackHandler;
36 import org.springframework.jdbc.core.simple.ParameterizedRowMapper;
37 import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
38 import org.springframework.util.Assert;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements JobExecutionDao, InitializingBean {
55
56 private static final Log logger = LogFactory.getLog(JdbcJobExecutionDao.class);
57
58 private static final String SAVE_JOB_EXECUTION = "INSERT into %PREFIX%JOB_EXECUTION(JOB_EXECUTION_ID, JOB_INSTANCE_ID, START_TIME, "
59 + "END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, VERSION, CREATE_TIME, LAST_UPDATED) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
60
61 private static final String CHECK_JOB_EXECUTION_EXISTS = "SELECT COUNT(*) FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID = ?";
62
63 private static final String GET_STATUS = "SELECT STATUS from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?";
64
65 private static final String UPDATE_JOB_EXECUTION = "UPDATE %PREFIX%JOB_EXECUTION set START_TIME = ?, END_TIME = ?, "
66 + " STATUS = ?, EXIT_CODE = ?, EXIT_MESSAGE = ?, VERSION = ?, CREATE_TIME = ?, LAST_UPDATED = ? where JOB_EXECUTION_ID = ? and VERSION = ?";
67
68 private static final String FIND_JOB_EXECUTIONS = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION"
69 + " from %PREFIX%JOB_EXECUTION where JOB_INSTANCE_ID = ? order by JOB_EXECUTION_ID desc";
70
71 private static final String GET_LAST_EXECUTION = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION "
72 + "from %PREFIX%JOB_EXECUTION E where JOB_INSTANCE_ID = ? and JOB_EXECUTION_ID in (SELECT max(JOB_EXECUTION_ID) from %PREFIX%JOB_EXECUTION E2 where E.JOB_INSTANCE_ID = E2.JOB_INSTANCE_ID)";
73
74 private static final String GET_EXECUTION_BY_ID = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION"
75 + " from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?";
76
77 private static final String GET_RUNNING_EXECUTIONS = "SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, "
78 + "E.JOB_INSTANCE_ID from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.END_TIME is NULL order by E.JOB_EXECUTION_ID desc";
79
80 private static final String CURRENT_VERSION_JOB_EXECUTION = "SELECT VERSION FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID=?";
81
82 private int exitMessageLength = DEFAULT_EXIT_MESSAGE_LENGTH;
83
84 private DataFieldMaxValueIncrementer jobExecutionIncrementer;
85
86
87
88
89
90
91 public void setExitMessageLength(int exitMessageLength) {
92 this.exitMessageLength = exitMessageLength;
93 }
94
95
96
97
98
99
100
101 public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) {
102 this.jobExecutionIncrementer = jobExecutionIncrementer;
103 }
104
105 @Override
106 public void afterPropertiesSet() throws Exception {
107 super.afterPropertiesSet();
108 Assert.notNull(jobExecutionIncrementer, "The jobExecutionIncrementer must not be null.");
109 }
110
111 @Override
112 public List<JobExecution> findJobExecutions(final JobInstance job) {
113
114 Assert.notNull(job, "Job cannot be null.");
115 Assert.notNull(job.getId(), "Job Id cannot be null.");
116
117 return getJdbcTemplate().query(getQuery(FIND_JOB_EXECUTIONS), new JobExecutionRowMapper(job), job.getId());
118 }
119
120
121
122
123
124
125
126
127
128
129
130 @Override
131 public void saveJobExecution(JobExecution jobExecution) {
132
133 validateJobExecution(jobExecution);
134
135 jobExecution.incrementVersion();
136
137 jobExecution.setId(jobExecutionIncrementer.nextLongValue());
138 Object[] parameters = new Object[] { jobExecution.getId(), jobExecution.getJobId(),
139 jobExecution.getStartTime(), jobExecution.getEndTime(), jobExecution.getStatus().toString(),
140 jobExecution.getExitStatus().getExitCode(), jobExecution.getExitStatus().getExitDescription(),
141 jobExecution.getVersion(), jobExecution.getCreateTime(), jobExecution.getLastUpdated() };
142 getJdbcTemplate().update(
143 getQuery(SAVE_JOB_EXECUTION),
144 parameters,
145 new int[] { Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR,
146 Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP });
147 }
148
149
150
151
152
153
154
155
156 private void validateJobExecution(JobExecution jobExecution) {
157
158 Assert.notNull(jobExecution);
159 Assert.notNull(jobExecution.getJobId(), "JobExecution Job-Id cannot be null.");
160 Assert.notNull(jobExecution.getStatus(), "JobExecution status cannot be null.");
161 Assert.notNull(jobExecution.getCreateTime(), "JobExecution create time cannot be null");
162 }
163
164
165
166
167
168
169
170
171
172 @Override
173 public void updateJobExecution(JobExecution jobExecution) {
174
175 validateJobExecution(jobExecution);
176
177 Assert.notNull(jobExecution.getId(),
178 "JobExecution ID cannot be null. JobExecution must be saved before it can be updated");
179
180 Assert.notNull(jobExecution.getVersion(),
181 "JobExecution version cannot be null. JobExecution must be saved before it can be updated");
182
183 synchronized (jobExecution) {
184 Integer version = jobExecution.getVersion() + 1;
185
186 String exitDescription = jobExecution.getExitStatus().getExitDescription();
187 if (exitDescription != null && exitDescription.length() > exitMessageLength) {
188 exitDescription = exitDescription.substring(0, exitMessageLength);
189 logger.debug("Truncating long message before update of JobExecution: " + jobExecution);
190 }
191 Object[] parameters = new Object[] { jobExecution.getStartTime(), jobExecution.getEndTime(),
192 jobExecution.getStatus().toString(), jobExecution.getExitStatus().getExitCode(), exitDescription,
193 version, jobExecution.getCreateTime(), jobExecution.getLastUpdated(), jobExecution.getId(),
194 jobExecution.getVersion() };
195
196
197
198
199
200 if (getJdbcTemplate().queryForInt(getQuery(CHECK_JOB_EXECUTION_EXISTS),
201 new Object[] { jobExecution.getId() }) != 1) {
202 throw new NoSuchObjectException("Invalid JobExecution, ID " + jobExecution.getId() + " not found.");
203 }
204
205 int count = getJdbcTemplate().update(
206 getQuery(UPDATE_JOB_EXECUTION),
207 parameters,
208 new int[] { Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
209 Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP, Types.BIGINT, Types.INTEGER });
210
211
212 if (count == 0) {
213 int curentVersion = getJdbcTemplate().queryForInt(getQuery(CURRENT_VERSION_JOB_EXECUTION),
214 new Object[] { jobExecution.getId() });
215 throw new OptimisticLockingFailureException("Attempt to update job execution id="
216 + jobExecution.getId() + " with wrong version (" + jobExecution.getVersion()
217 + "), where current version is " + curentVersion);
218 }
219
220 jobExecution.incrementVersion();
221 }
222 }
223
224 @Override
225 public JobExecution getLastJobExecution(JobInstance jobInstance) {
226
227 Long id = jobInstance.getId();
228
229 List<JobExecution> executions = getJdbcTemplate().query(getQuery(GET_LAST_EXECUTION),
230 new JobExecutionRowMapper(jobInstance), id);
231
232 Assert.state(executions.size() <= 1, "There must be at most one latest job execution");
233
234 if (executions.isEmpty()) {
235 return null;
236 }
237 else {
238 return executions.get(0);
239 }
240 }
241
242
243
244
245
246
247
248 @Override
249 public JobExecution getJobExecution(Long executionId) {
250 try {
251 JobExecution jobExecution = getJdbcTemplate().queryForObject(getQuery(GET_EXECUTION_BY_ID),
252 new JobExecutionRowMapper(), executionId);
253 return jobExecution;
254 }
255 catch (EmptyResultDataAccessException e) {
256 return null;
257 }
258 }
259
260
261
262
263
264
265
266 @Override
267 public Set<JobExecution> findRunningJobExecutions(String jobName) {
268
269 final Set<JobExecution> result = new HashSet<JobExecution>();
270 RowCallbackHandler handler = new RowCallbackHandler() {
271 @Override
272 public void processRow(ResultSet rs) throws SQLException {
273 JobExecutionRowMapper mapper = new JobExecutionRowMapper();
274 result.add(mapper.mapRow(rs, 0));
275 }
276 };
277 getJdbcTemplate().query(getQuery(GET_RUNNING_EXECUTIONS), new Object[] { jobName }, handler);
278
279 return result;
280 }
281
282 @Override
283 public void synchronizeStatus(JobExecution jobExecution) {
284 int currentVersion = getJdbcTemplate().queryForInt(getQuery(CURRENT_VERSION_JOB_EXECUTION),
285 jobExecution.getId());
286
287 if (currentVersion != jobExecution.getVersion().intValue()) {
288 String status = getJdbcTemplate().queryForObject(getQuery(GET_STATUS), String.class, jobExecution.getId());
289 jobExecution.upgradeStatus(BatchStatus.valueOf(status));
290 jobExecution.setVersion(currentVersion);
291 }
292 }
293
294
295
296
297
298
299
300 private static class JobExecutionRowMapper implements ParameterizedRowMapper<JobExecution> {
301
302 private JobInstance jobInstance;
303
304 public JobExecutionRowMapper() {
305 }
306
307 public JobExecutionRowMapper(JobInstance jobInstance) {
308 this.jobInstance = jobInstance;
309 }
310
311 @Override
312 public JobExecution mapRow(ResultSet rs, int rowNum) throws SQLException {
313 Long id = rs.getLong(1);
314 JobExecution jobExecution;
315
316 if (jobInstance == null) {
317 jobExecution = new JobExecution(id);
318 }
319 else {
320 jobExecution = new JobExecution(jobInstance, id);
321 }
322
323 jobExecution.setStartTime(rs.getTimestamp(2));
324 jobExecution.setEndTime(rs.getTimestamp(3));
325 jobExecution.setStatus(BatchStatus.valueOf(rs.getString(4)));
326 jobExecution.setExitStatus(new ExitStatus(rs.getString(5), rs.getString(6)));
327 jobExecution.setCreateTime(rs.getTimestamp(7));
328 jobExecution.setLastUpdated(rs.getTimestamp(8));
329 jobExecution.setVersion(rs.getInt(9));
330 return jobExecution;
331 }
332
333 }
334 }