1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.core.repository.dao;
18
19 import java.io.UnsupportedEncodingException;
20 import java.math.BigInteger;
21 import java.security.MessageDigest;
22 import java.security.NoSuchAlgorithmException;
23 import java.sql.ResultSet;
24 import java.sql.SQLException;
25 import java.sql.Timestamp;
26 import java.sql.Types;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Map.Entry;
33
34 import org.springframework.batch.core.JobExecution;
35 import org.springframework.batch.core.JobInstance;
36 import org.springframework.batch.core.JobParameter;
37 import org.springframework.batch.core.JobParameter.ParameterType;
38 import org.springframework.batch.core.JobParameters;
39 import org.springframework.beans.factory.InitializingBean;
40 import org.springframework.dao.DataAccessException;
41 import org.springframework.dao.EmptyResultDataAccessException;
42 import org.springframework.jdbc.core.ResultSetExtractor;
43 import org.springframework.jdbc.core.RowCallbackHandler;
44 import org.springframework.jdbc.core.simple.ParameterizedRowMapper;
45 import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
46 import org.springframework.util.Assert;
47 import org.springframework.util.StringUtils;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 public class JdbcJobInstanceDao extends AbstractJdbcBatchMetadataDao implements
63 JobInstanceDao, InitializingBean {
64
65 private static final String CREATE_JOB_INSTANCE = "INSERT into %PREFIX%JOB_INSTANCE(JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION)"
66 + " values (?, ?, ?, ?)";
67
68 private static final String CREATE_JOB_PARAMETERS = "INSERT into %PREFIX%JOB_PARAMS(JOB_INSTANCE_ID, KEY_NAME, TYPE_CD, "
69 + "STRING_VAL, DATE_VAL, LONG_VAL, DOUBLE_VAL) values (?, ?, ?, ?, ?, ?, ?)";
70
71 private static final String FIND_JOBS_WITH_NAME = "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ?";
72
73 private static final String FIND_JOBS_WITH_KEY = FIND_JOBS_WITH_NAME
74 + " and JOB_KEY = ?";
75
76 private static final String FIND_JOBS_WITH_EMPTY_KEY = "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ? and (JOB_KEY = ? OR JOB_KEY is NULL)";
77
78 private static final String GET_JOB_FROM_ID = "SELECT JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION from %PREFIX%JOB_INSTANCE where JOB_INSTANCE_ID = ?";
79
80 private static final String GET_JOB_FROM_EXECUTION_ID = "SELECT ji.JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, ji.VERSION from %PREFIX%JOB_INSTANCE ji, "
81 + "%PREFIX%JOB_EXECUTION je where JOB_EXECUTION_ID = ? and ji.JOB_INSTANCE_ID = je.JOB_INSTANCE_ID";
82
83 private static final String FIND_PARAMS_FROM_ID = "SELECT JOB_INSTANCE_ID, KEY_NAME, TYPE_CD, "
84 + "STRING_VAL, DATE_VAL, LONG_VAL, DOUBLE_VAL from %PREFIX%JOB_PARAMS where JOB_INSTANCE_ID = ?";
85
86 private static final String FIND_JOB_NAMES = "SELECT distinct JOB_NAME from %PREFIX%JOB_INSTANCE order by JOB_NAME";
87
88 private static final String FIND_LAST_JOBS_BY_NAME = "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ? order by JOB_INSTANCE_ID desc";
89
90 private DataFieldMaxValueIncrementer jobIncrementer;
91
92
93
94
95
96
97
98
99
100
101 @Override
102 public JobInstance createJobInstance(String jobName,
103 JobParameters jobParameters) {
104
105 Assert.notNull(jobName, "Job name must not be null.");
106 Assert.notNull(jobParameters, "JobParameters must not be null.");
107
108 Assert.state(getJobInstance(jobName, jobParameters) == null,
109 "JobInstance must not already exist");
110
111 Long jobId = jobIncrementer.nextLongValue();
112
113 JobInstance jobInstance = new JobInstance(jobId, jobParameters, jobName);
114 jobInstance.incrementVersion();
115
116 Object[] parameters = new Object[] { jobId, jobName,
117 createJobKey(jobParameters), jobInstance.getVersion() };
118 getJdbcTemplate().update(
119 getQuery(CREATE_JOB_INSTANCE),
120 parameters,
121 new int[] { Types.BIGINT, Types.VARCHAR, Types.VARCHAR,
122 Types.INTEGER });
123
124 insertJobParameters(jobId, jobParameters);
125
126 return jobInstance;
127 }
128
129 protected String createJobKey(JobParameters jobParameters) {
130
131 Map<String, JobParameter> props = jobParameters.getParameters();
132 StringBuffer stringBuffer = new StringBuffer();
133 List<String> keys = new ArrayList<String>(props.keySet());
134 Collections.sort(keys);
135 for (String key : keys) {
136 JobParameter jobParameter = props.get(key);
137 String value = jobParameter.getValue()==null ? "" : jobParameter.toString();
138 stringBuffer.append(key + "=" + value + ";");
139 }
140
141 MessageDigest digest;
142 try {
143 digest = MessageDigest.getInstance("MD5");
144 } catch (NoSuchAlgorithmException e) {
145 throw new IllegalStateException(
146 "MD5 algorithm not available. Fatal (should be in the JDK).");
147 }
148
149 try {
150 byte[] bytes = digest.digest(stringBuffer.toString().getBytes(
151 "UTF-8"));
152 return String.format("%032x", new BigInteger(1, bytes));
153 } catch (UnsupportedEncodingException e) {
154 throw new IllegalStateException(
155 "UTF-8 encoding not available. Fatal (should be in the JDK).");
156 }
157 }
158
159
160
161
162
163
164 private void insertJobParameters(Long jobId, JobParameters jobParameters) {
165
166 for (Entry<String, JobParameter> entry : jobParameters.getParameters()
167 .entrySet()) {
168 JobParameter jobParameter = entry.getValue();
169 insertParameter(jobId, jobParameter.getType(), entry.getKey(),
170 jobParameter.getValue());
171 }
172 }
173
174
175
176
177
178 private void insertParameter(Long jobId, ParameterType type, String key,
179 Object value) {
180
181 Object[] args = new Object[0];
182 int[] argTypes = new int[] { Types.BIGINT, Types.VARCHAR,
183 Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.BIGINT,
184 Types.DOUBLE };
185
186 if (type == ParameterType.STRING) {
187 args = new Object[] { jobId, key, type, value, new Timestamp(0L),
188 0L, 0D };
189 } else if (type == ParameterType.LONG) {
190 args = new Object[] { jobId, key, type, "", new Timestamp(0L),
191 value, new Double(0) };
192 } else if (type == ParameterType.DOUBLE) {
193 args = new Object[] { jobId, key, type, "", new Timestamp(0L), 0L,
194 value };
195 } else if (type == ParameterType.DATE) {
196 args = new Object[] { jobId, key, type, "", value, 0L, 0D };
197 }
198
199 getJdbcTemplate().update(getQuery(CREATE_JOB_PARAMETERS), args, argTypes);
200 }
201
202
203
204
205
206
207
208
209
210 @Override
211 public JobInstance getJobInstance(final String jobName,
212 final JobParameters jobParameters) {
213
214 Assert.notNull(jobName, "Job name must not be null.");
215 Assert.notNull(jobParameters, "JobParameters must not be null.");
216
217 String jobKey = createJobKey(jobParameters);
218
219 ParameterizedRowMapper<JobInstance> rowMapper = new JobInstanceRowMapper(
220 jobParameters);
221
222 List<JobInstance> instances;
223 if (StringUtils.hasLength(jobKey)) {
224 instances = getJdbcTemplate().query(getQuery(FIND_JOBS_WITH_KEY),
225 rowMapper, jobName, jobKey);
226 } else {
227 instances = getJdbcTemplate().query(
228 getQuery(FIND_JOBS_WITH_EMPTY_KEY), rowMapper, jobName,
229 jobKey);
230 }
231
232 if (instances.isEmpty()) {
233 return null;
234 } else {
235 Assert.state(instances.size() == 1);
236 return instances.get(0);
237 }
238 }
239
240
241
242
243
244
245
246
247 @Override
248 public JobInstance getJobInstance(Long instanceId) {
249
250 try {
251 return getJdbcTemplate().queryForObject(getQuery(GET_JOB_FROM_ID),
252 new JobInstanceRowMapper(), instanceId);
253 } catch (EmptyResultDataAccessException e) {
254 return null;
255 }
256
257 }
258
259
260
261
262
263 private JobParameters getJobParameters(Long instanceId) {
264 final Map<String, JobParameter> map = new HashMap<String, JobParameter>();
265 RowCallbackHandler handler = new RowCallbackHandler() {
266 @Override
267 public void processRow(ResultSet rs) throws SQLException {
268 ParameterType type = ParameterType.valueOf(rs.getString(3));
269 JobParameter value = null;
270 if (type == ParameterType.STRING) {
271 value = new JobParameter(rs.getString(4));
272 } else if (type == ParameterType.LONG) {
273 value = new JobParameter(rs.getLong(6));
274 } else if (type == ParameterType.DOUBLE) {
275 value = new JobParameter(rs.getDouble(7));
276 } else if (type == ParameterType.DATE) {
277 value = new JobParameter(rs.getTimestamp(5));
278 }
279
280 map.put(rs.getString(2), value);
281 }
282 };
283 getJdbcTemplate().query(getQuery(FIND_PARAMS_FROM_ID), new Object[] { instanceId }, handler);
284 return new JobParameters(map);
285 }
286
287
288
289
290
291
292
293
294 @Override
295 public List<String> getJobNames() {
296 return getJdbcTemplate().query(getQuery(FIND_JOB_NAMES),
297 new ParameterizedRowMapper<String>() {
298 @Override
299 public String mapRow(ResultSet rs, int rowNum)
300 throws SQLException {
301 return rs.getString(1);
302 }
303 });
304 }
305
306
307
308
309
310
311
312 @Override
313 @SuppressWarnings("rawtypes")
314 public List<JobInstance> getJobInstances(String jobName, final int start,
315 final int count) {
316
317 ResultSetExtractor extractor = new ResultSetExtractor() {
318
319 private List<JobInstance> list = new ArrayList<JobInstance>();
320
321 @Override
322 public Object extractData(ResultSet rs) throws SQLException,
323 DataAccessException {
324 int rowNum = 0;
325 while (rowNum < start && rs.next()) {
326 rowNum++;
327 }
328 while (rowNum < start + count && rs.next()) {
329 ParameterizedRowMapper<JobInstance> rowMapper = new JobInstanceRowMapper();
330 list.add(rowMapper.mapRow(rs, rowNum));
331 rowNum++;
332 }
333 return list;
334 }
335
336 };
337
338 @SuppressWarnings("unchecked")
339 List<JobInstance> result = (List<JobInstance>) getJdbcTemplate().query(getQuery(FIND_LAST_JOBS_BY_NAME),
340 new Object[] { jobName }, extractor);
341
342 return result;
343 }
344
345
346
347
348
349
350
351
352 @Override
353 public JobInstance getJobInstance(JobExecution jobExecution) {
354
355 try {
356 return getJdbcTemplate().queryForObject(
357 getQuery(GET_JOB_FROM_EXECUTION_ID),
358 new JobInstanceRowMapper(), jobExecution.getId());
359 } catch (EmptyResultDataAccessException e) {
360 return null;
361 }
362 }
363
364
365
366
367
368
369
370
371 public void setJobIncrementer(DataFieldMaxValueIncrementer jobIncrementer) {
372 this.jobIncrementer = jobIncrementer;
373 }
374
375 @Override
376 public void afterPropertiesSet() throws Exception {
377 super.afterPropertiesSet();
378 Assert.notNull(jobIncrementer);
379 }
380
381
382
383
384
385 private final class JobInstanceRowMapper implements
386 ParameterizedRowMapper<JobInstance> {
387
388 private JobParameters jobParameters;
389
390 public JobInstanceRowMapper() {
391 }
392
393 public JobInstanceRowMapper(JobParameters jobParameters) {
394 this.jobParameters = jobParameters;
395 }
396
397 @Override
398 public JobInstance mapRow(ResultSet rs, int rowNum) throws SQLException {
399 Long id = rs.getLong(1);
400 if (jobParameters == null) {
401 jobParameters = getJobParameters(id);
402 }
403 JobInstance jobInstance = new JobInstance(rs.getLong(1),
404 jobParameters, rs.getString(2));
405
406 jobInstance.incrementVersion();
407 return jobInstance;
408 }
409 }
410 }