1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.launch.support;
17
18 import java.util.ArrayList;
19 import java.util.Date;
20 import java.util.LinkedHashMap;
21 import java.util.LinkedHashSet;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.TreeSet;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.springframework.batch.core.BatchStatus;
30 import org.springframework.batch.core.Job;
31 import org.springframework.batch.core.JobExecution;
32 import org.springframework.batch.core.JobInstance;
33 import org.springframework.batch.core.JobParameters;
34 import org.springframework.batch.core.JobParametersIncrementer;
35 import org.springframework.batch.core.JobParametersInvalidException;
36 import org.springframework.batch.core.StepExecution;
37 import org.springframework.batch.core.UnexpectedJobExecutionException;
38 import org.springframework.batch.core.configuration.JobRegistry;
39 import org.springframework.batch.core.configuration.ListableJobLocator;
40 import org.springframework.batch.core.converter.DefaultJobParametersConverter;
41 import org.springframework.batch.core.converter.JobParametersConverter;
42 import org.springframework.batch.core.explore.JobExplorer;
43 import org.springframework.batch.core.launch.JobExecutionNotRunningException;
44 import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
45 import org.springframework.batch.core.launch.JobLauncher;
46 import org.springframework.batch.core.launch.JobOperator;
47 import org.springframework.batch.core.launch.JobParametersNotFoundException;
48 import org.springframework.batch.core.launch.NoSuchJobException;
49 import org.springframework.batch.core.launch.NoSuchJobExecutionException;
50 import org.springframework.batch.core.launch.NoSuchJobInstanceException;
51 import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
52 import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
53 import org.springframework.batch.core.repository.JobRepository;
54 import org.springframework.batch.core.repository.JobRestartException;
55 import org.springframework.batch.support.PropertiesConverter;
56 import org.springframework.beans.factory.InitializingBean;
57 import org.springframework.transaction.annotation.Transactional;
58 import org.springframework.util.Assert;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 public class SimpleJobOperator implements JobOperator, InitializingBean {
77
78
79
80
81 private static final String ILLEGAL_STATE_MSG = "Illegal state (only happens on a race condition): "
82 + "%s with name=%s and parameters=%s";
83
84 private ListableJobLocator jobRegistry;
85
86 private JobExplorer jobExplorer;
87
88 private JobLauncher jobLauncher;
89
90 private JobRepository jobRepository;
91
92 private JobParametersConverter jobParametersConverter = new DefaultJobParametersConverter();
93
94 private final Log logger = LogFactory.getLog(getClass());
95
96
97
98
99
100
101 @Override
102 public void afterPropertiesSet() throws Exception {
103 Assert.notNull(jobLauncher, "JobLauncher must be provided");
104 Assert.notNull(jobRegistry, "JobLocator must be provided");
105 Assert.notNull(jobExplorer, "JobExplorer must be provided");
106 Assert.notNull(jobRepository, "JobRepository must be provided");
107 }
108
109
110
111
112
113 public void setJobParametersConverter(JobParametersConverter jobParametersConverter) {
114 this.jobParametersConverter = jobParametersConverter;
115 }
116
117
118
119
120
121 public void setJobRegistry(ListableJobLocator jobRegistry) {
122 this.jobRegistry = jobRegistry;
123 }
124
125
126
127
128
129 public void setJobExplorer(JobExplorer jobExplorer) {
130 this.jobExplorer = jobExplorer;
131 }
132
133 public void setJobRepository(JobRepository jobRepository) {
134 this.jobRepository = jobRepository;
135 }
136
137
138
139
140
141 public void setJobLauncher(JobLauncher jobLauncher) {
142 this.jobLauncher = jobLauncher;
143 }
144
145
146
147
148
149
150 @Override
151 public List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException {
152 JobInstance jobInstance = jobExplorer.getJobInstance(instanceId);
153 if (jobInstance == null) {
154 throw new NoSuchJobInstanceException(String.format("No job instance with id=%d", instanceId));
155 }
156 List<Long> list = new ArrayList<Long>();
157 for (JobExecution jobExecution : jobExplorer.getJobExecutions(jobInstance)) {
158 list.add(jobExecution.getId());
159 }
160 return list;
161 }
162
163
164
165
166
167
168 @Override
169 public Set<String> getJobNames() {
170 return new TreeSet<String>(jobRegistry.getJobNames());
171 }
172
173
174
175
176
177
178 @Override
179 public List<Long> getJobInstances(String jobName, int start, int count) throws NoSuchJobException {
180 List<Long> list = new ArrayList<Long>();
181 for (JobInstance jobInstance : jobExplorer.getJobInstances(jobName, start, count)) {
182 list.add(jobInstance.getId());
183 }
184 if (list.isEmpty() && !jobRegistry.getJobNames().contains(jobName)) {
185 throw new NoSuchJobException("No such job (either in registry or in historical data): " + jobName);
186 }
187 return list;
188 }
189
190
191
192
193
194
195
196
197 @Override
198 public String getParameters(long executionId) throws NoSuchJobExecutionException {
199 JobExecution jobExecution = findExecutionById(executionId);
200
201 return PropertiesConverter.propertiesToString(jobParametersConverter.getProperties(jobExecution
202 .getJobInstance().getJobParameters()));
203 }
204
205
206
207
208
209
210
211
212 @Override
213 public Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException {
214 Set<Long> set = new LinkedHashSet<Long>();
215 for (JobExecution jobExecution : jobExplorer.findRunningJobExecutions(jobName)) {
216 set.add(jobExecution.getId());
217 }
218 if (set.isEmpty() && !jobRegistry.getJobNames().contains(jobName)) {
219 throw new NoSuchJobException("No such job (either in registry or in historical data): " + jobName);
220 }
221 return set;
222 }
223
224
225
226
227
228
229
230
231 @Override
232 public Map<Long, String> getStepExecutionSummaries(long executionId) throws NoSuchJobExecutionException {
233 JobExecution jobExecution = findExecutionById(executionId);
234
235 Map<Long, String> map = new LinkedHashMap<Long, String>();
236 for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
237 map.put(stepExecution.getId(), stepExecution.toString());
238 }
239 return map;
240 }
241
242
243
244
245
246
247
248
249 @Override
250 public String getSummary(long executionId) throws NoSuchJobExecutionException {
251 JobExecution jobExecution = findExecutionById(executionId);
252 return jobExecution.toString();
253 }
254
255
256
257
258
259
260
261 @Override
262 public Long restart(long executionId) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException, NoSuchJobException, JobRestartException, JobParametersInvalidException {
263
264 logger.info("Checking status of job execution with id=" + executionId);
265
266 JobExecution jobExecution = findExecutionById(executionId);
267
268 String jobName = jobExecution.getJobInstance().getJobName();
269 Job job = jobRegistry.getJob(jobName);
270 JobParameters parameters = jobExecution.getJobInstance().getJobParameters();
271
272 logger.info(String.format("Attempting to resume job with name=%s and parameters=%s", jobName, parameters));
273 try {
274 return jobLauncher.run(job, parameters).getId();
275 }
276 catch (JobExecutionAlreadyRunningException e) {
277 throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running",
278 jobName, parameters), e);
279 }
280
281 }
282
283
284
285
286
287
288
289
290 @Override
291 public Long start(String jobName, String parameters) throws NoSuchJobException, JobInstanceAlreadyExistsException, JobParametersInvalidException {
292
293 logger.info("Checking status of job with name=" + jobName);
294
295 JobParameters jobParameters = jobParametersConverter.getJobParameters(PropertiesConverter
296 .stringToProperties(parameters));
297
298 if (jobRepository.isJobInstanceExists(jobName, jobParameters)) {
299 throw new JobInstanceAlreadyExistsException(String.format(
300 "Cannot start a job instance that already exists with name=%s and parameters=%s", jobName,
301 parameters));
302 }
303
304 Job job = jobRegistry.getJob(jobName);
305
306 logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters));
307 try {
308 return jobLauncher.run(job, jobParameters).getId();
309 }
310 catch (JobExecutionAlreadyRunningException e) {
311 throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running",
312 jobName, parameters), e);
313 }
314 catch (JobRestartException e) {
315 throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName,
316 parameters), e);
317 }
318 catch (JobInstanceAlreadyCompleteException e) {
319 throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already complete", jobName,
320 parameters), e);
321 }
322
323 }
324
325
326
327
328
329
330 @Override
331 public Long startNextInstance(String jobName) throws NoSuchJobException, JobParametersNotFoundException,
332 UnexpectedJobExecutionException, JobParametersInvalidException {
333
334 logger.info("Locating parameters for next instance of job with name=" + jobName);
335
336 Job job = jobRegistry.getJob(jobName);
337 List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobName, 0, 1);
338
339 JobParametersIncrementer incrementer = job.getJobParametersIncrementer();
340 if (incrementer == null) {
341 throw new JobParametersNotFoundException("No job parameters incrementer found for job=" + jobName);
342 }
343
344 JobParameters parameters;
345 if (lastInstances.isEmpty()) {
346 parameters = incrementer.getNext(new JobParameters());
347 if (parameters == null) {
348 throw new JobParametersNotFoundException("No bootstrap parameters found for job=" + jobName);
349 }
350 }
351 else {
352 parameters = incrementer.getNext(lastInstances.get(0).getJobParameters());
353 }
354
355 logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters));
356 try {
357 return jobLauncher.run(job, parameters).getId();
358 }
359 catch (JobExecutionAlreadyRunningException e) {
360 throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already running", jobName,
361 parameters), e);
362 }
363 catch (JobRestartException e) {
364 throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName,
365 parameters), e);
366 }
367 catch (JobInstanceAlreadyCompleteException e) {
368 throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job instance already complete",
369 jobName, parameters), e);
370 }
371
372 }
373
374
375
376
377
378
379
380 @Override
381 @Transactional
382 public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
383
384 JobExecution jobExecution = findExecutionById(executionId);
385
386
387
388 BatchStatus status = jobExecution.getStatus();
389 if (!(status == BatchStatus.STARTED || status == BatchStatus.STARTING)) {
390 throw new JobExecutionNotRunningException("JobExecution must be running so that it can be stopped: "+jobExecution);
391 }
392 jobExecution.setStatus(BatchStatus.STOPPING);
393 jobRepository.update(jobExecution);
394
395 return true;
396 }
397
398 @Override
399 public JobExecution abandon(long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException {
400 JobExecution jobExecution = findExecutionById(jobExecutionId);
401
402 if (jobExecution.getStatus().isLessThan(BatchStatus.STOPPING)) {
403 throw new JobExecutionAlreadyRunningException(
404 "JobExecution is running or complete and therefore cannot be aborted");
405 }
406
407 logger.info("Aborting job execution: " + jobExecution);
408 jobExecution.upgradeStatus(BatchStatus.ABANDONED);
409 jobExecution.setEndTime(new Date());
410 jobRepository.update(jobExecution);
411
412 return jobExecution;
413 }
414
415 private JobExecution findExecutionById(long executionId) throws NoSuchJobExecutionException {
416 JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
417
418 if (jobExecution == null) {
419 throw new NoSuchJobExecutionException("No JobExecution found for id: [" + executionId + "]");
420 }
421 return jobExecution;
422
423 }
424 }