1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.launch.support;
17
18 import java.io.BufferedReader;
19 import java.io.IOException;
20 import java.io.InputStreamReader;
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29 import java.util.Set;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.springframework.batch.core.BatchStatus;
34 import org.springframework.batch.core.ExitStatus;
35 import org.springframework.batch.core.Job;
36 import org.springframework.batch.core.JobExecution;
37 import org.springframework.batch.core.JobInstance;
38 import org.springframework.batch.core.JobParameter;
39 import org.springframework.batch.core.JobParameters;
40 import org.springframework.batch.core.JobParametersIncrementer;
41 import org.springframework.batch.core.configuration.JobLocator;
42 import org.springframework.batch.core.converter.DefaultJobParametersConverter;
43 import org.springframework.batch.core.converter.JobParametersConverter;
44 import org.springframework.batch.core.explore.JobExplorer;
45 import org.springframework.batch.core.launch.JobExecutionNotFailedException;
46 import org.springframework.batch.core.launch.JobExecutionNotRunningException;
47 import org.springframework.batch.core.launch.JobExecutionNotStoppedException;
48 import org.springframework.batch.core.launch.JobLauncher;
49 import org.springframework.batch.core.launch.JobParametersNotFoundException;
50 import org.springframework.batch.core.repository.JobRepository;
51 import org.springframework.beans.factory.BeanDefinitionStoreException;
52 import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
53 import org.springframework.context.ConfigurableApplicationContext;
54 import org.springframework.context.support.ClassPathXmlApplicationContext;
55 import org.springframework.util.Assert;
56 import org.springframework.util.StringUtils;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160 public class CommandLineJobRunner {
161
162 protected static final Log logger = LogFactory.getLog(CommandLineJobRunner.class);
163
164 private ExitCodeMapper exitCodeMapper = new SimpleJvmExitCodeMapper();
165
166 private JobLauncher launcher;
167
168 private JobLocator jobLocator;
169
170
171 private static SystemExiter systemExiter = new JvmSystemExiter();
172
173 private static String message = "";
174
175 private JobParametersConverter jobParametersConverter = new DefaultJobParametersConverter();
176
177 private JobExplorer jobExplorer;
178
179 private JobRepository jobRepository;
180
181
182
183
184
185
186 public void setLauncher(JobLauncher launcher) {
187 this.launcher = launcher;
188 }
189
190
191
192
193 public void setJobRepository(JobRepository jobRepository) {
194 this.jobRepository = jobRepository;
195 }
196
197
198
199
200
201
202 public void setJobExplorer(JobExplorer jobExplorer) {
203 this.jobExplorer = jobExplorer;
204 }
205
206
207
208
209
210
211 public void setExitCodeMapper(ExitCodeMapper exitCodeMapper) {
212 this.exitCodeMapper = exitCodeMapper;
213 }
214
215
216
217
218
219
220
221
222 public static void presetSystemExiter(SystemExiter systemExiter) {
223 CommandLineJobRunner.systemExiter = systemExiter;
224 }
225
226
227
228
229
230
231
232
233 public static String getErrorMessage() {
234 return message;
235 }
236
237
238
239
240
241
242 public void setSystemExiter(SystemExiter systemExiter) {
243 CommandLineJobRunner.systemExiter = systemExiter;
244 }
245
246
247
248
249
250
251 public void setJobParametersConverter(JobParametersConverter jobParametersConverter) {
252 this.jobParametersConverter = jobParametersConverter;
253 }
254
255
256
257
258
259
260 public void exit(int status) {
261 systemExiter.exit(status);
262 }
263
264
265
266
267
268 public void setJobLocator(JobLocator jobLocator) {
269 this.jobLocator = jobLocator;
270 }
271
272
273
274
275
276
277 int start(String jobPath, String jobIdentifier, String[] parameters, Set<String> opts) {
278
279 ConfigurableApplicationContext context = null;
280
281 try {
282 context = new ClassPathXmlApplicationContext(jobPath);
283 context.getAutowireCapableBeanFactory().autowireBeanProperties(this,
284 AutowireCapableBeanFactory.AUTOWIRE_BY_TYPE, false);
285
286 Assert.state(launcher != null, "A JobLauncher must be provided. Please add one to the configuration.");
287 if (opts.contains("-restart") || opts.contains("-next")) {
288 Assert.state(jobExplorer != null,
289 "A JobExplorer must be provided for a restart or start next operation. Please add one to the configuration.");
290 }
291
292 String jobName = jobIdentifier;
293
294 JobParameters jobParameters = jobParametersConverter.getJobParameters(StringUtils
295 .splitArrayElementsIntoProperties(parameters, "="));
296 Assert.isTrue(parameters == null || parameters.length == 0 || !jobParameters.isEmpty(),
297 "Invalid JobParameters " + Arrays.asList(parameters)
298 + ". If parameters are provided they should be in the form name=value (no whitespace).");
299
300 if (opts.contains("-stop")) {
301 List<JobExecution> jobExecutions = getRunningJobExecutions(jobIdentifier);
302 if (jobExecutions == null) {
303 throw new JobExecutionNotRunningException("No running execution found for job=" + jobIdentifier);
304 }
305 for (JobExecution jobExecution : jobExecutions) {
306 jobExecution.setStatus(BatchStatus.STOPPING);
307 jobRepository.update(jobExecution);
308 }
309 return exitCodeMapper.intValue(ExitStatus.COMPLETED.getExitCode());
310 }
311
312 if (opts.contains("-abandon")) {
313 List<JobExecution> jobExecutions = getStoppedJobExecutions(jobIdentifier);
314 if (jobExecutions == null) {
315 throw new JobExecutionNotStoppedException("No stopped execution found for job=" + jobIdentifier);
316 }
317 for (JobExecution jobExecution : jobExecutions) {
318 jobExecution.setStatus(BatchStatus.ABANDONED);
319 jobRepository.update(jobExecution);
320 }
321 return exitCodeMapper.intValue(ExitStatus.COMPLETED.getExitCode());
322 }
323
324 if (opts.contains("-restart")) {
325 JobExecution jobExecution = getLastFailedJobExecution(jobIdentifier);
326 if (jobExecution == null) {
327 throw new JobExecutionNotFailedException("No failed or stopped execution found for job="
328 + jobIdentifier);
329 }
330 jobParameters = jobExecution.getJobInstance().getJobParameters();
331 jobName = jobExecution.getJobInstance().getJobName();
332 }
333
334 Job job;
335 if (jobLocator != null) {
336 job = jobLocator.getJob(jobName);
337 }
338 else {
339 job = (Job) context.getBean(jobName);
340 }
341
342 if (opts.contains("-next")) {
343 JobParameters nextParameters = getNextJobParameters(job);
344 Map<String, JobParameter> map = new HashMap<String, JobParameter>(nextParameters.getParameters());
345 map.putAll(jobParameters.getParameters());
346 jobParameters = new JobParameters(map);
347 }
348
349 JobExecution jobExecution = launcher.run(job, jobParameters);
350 return exitCodeMapper.intValue(jobExecution.getExitStatus().getExitCode());
351
352 }
353 catch (Throwable e) {
354 String message = "Job Terminated in error: " + e.getMessage();
355 logger.error(message, e);
356 CommandLineJobRunner.message = message;
357 return exitCodeMapper.intValue(ExitStatus.FAILED.getExitCode());
358 }
359 finally {
360 if (context != null) {
361 context.close();
362 }
363 }
364 }
365
366
367
368
369
370
371 private List<JobExecution> getJobExecutionsWithStatusGreaterThan(String jobIdentifier, BatchStatus minStatus) {
372
373 Long executionId = getLongIdentifier(jobIdentifier);
374 if (executionId != null) {
375 JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
376 if (jobExecution.getStatus().isGreaterThan(minStatus)) {
377 return Arrays.asList(jobExecution);
378 }
379 return Collections.emptyList();
380 }
381
382 int start = 0;
383 int count = 100;
384 List<JobExecution> executions = new ArrayList<JobExecution>();
385 List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobIdentifier, start, count);
386
387 while (!lastInstances.isEmpty()) {
388
389 for (JobInstance jobInstance : lastInstances) {
390 List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
391 if (jobExecutions == null || jobExecutions.isEmpty()) {
392 continue;
393 }
394 for (JobExecution jobExecution : jobExecutions) {
395 if (jobExecution.getStatus().isGreaterThan(minStatus)) {
396 executions.add(jobExecution);
397 }
398 }
399 }
400
401 start += count;
402 lastInstances = jobExplorer.getJobInstances(jobIdentifier, start, count);
403
404 }
405
406 return executions;
407
408 }
409
410 private JobExecution getLastFailedJobExecution(String jobIdentifier) {
411 List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.STOPPING);
412 if (jobExecutions.isEmpty()) {
413 return null;
414 }
415 return jobExecutions.get(0);
416 }
417
418 private List<JobExecution> getStoppedJobExecutions(String jobIdentifier) {
419 List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.STARTED);
420 if (jobExecutions.isEmpty()) {
421 return null;
422 }
423 List<JobExecution> result = new ArrayList<JobExecution>();
424 for (JobExecution jobExecution : jobExecutions) {
425 if (jobExecution.getStatus() != BatchStatus.ABANDONED) {
426 result.add(jobExecution);
427 }
428 }
429 return result.isEmpty() ? null : result;
430 }
431
432 private List<JobExecution> getRunningJobExecutions(String jobIdentifier) {
433 List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.COMPLETED);
434 if (jobExecutions.isEmpty()) {
435 return null;
436 }
437 List<JobExecution> result = new ArrayList<JobExecution>();
438 for (JobExecution jobExecution : jobExecutions) {
439 if (jobExecution.isRunning()) {
440 result.add(jobExecution);
441 }
442 }
443 return result.isEmpty() ? null : result;
444 }
445
446 private Long getLongIdentifier(String jobIdentifier) {
447 try {
448 return new Long(jobIdentifier);
449 }
450 catch (NumberFormatException e) {
451
452 return null;
453 }
454 }
455
456
457
458
459
460
461 private JobParameters getNextJobParameters(Job job) throws JobParametersNotFoundException {
462 String jobIdentifier = job.getName();
463 JobParameters jobParameters;
464 List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobIdentifier, 0, 1);
465
466 JobParametersIncrementer incrementer = job.getJobParametersIncrementer();
467 if (incrementer == null) {
468 throw new JobParametersNotFoundException("No job parameters incrementer found for job=" + jobIdentifier);
469 }
470
471 if (lastInstances.isEmpty()) {
472 jobParameters = incrementer.getNext(new JobParameters());
473 if (jobParameters == null) {
474 throw new JobParametersNotFoundException("No bootstrap parameters found from incrementer for job="
475 + jobIdentifier);
476 }
477 }
478 else {
479 jobParameters = incrementer.getNext(lastInstances.get(0).getJobParameters());
480 }
481 return jobParameters;
482 }
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512 public static void main(String[] args) throws Exception {
513
514 CommandLineJobRunner command = new CommandLineJobRunner();
515
516 List<String> newargs = new ArrayList<String>(Arrays.asList(args));
517
518 try {
519 if (System.in.available() > 0) {
520 BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
521 String line = " ";
522 while (StringUtils.hasLength(line)) {
523 if (!line.startsWith("#") && StringUtils.hasText(line)) {
524 logger.debug("Stdin arg: " + line);
525 newargs.add(line);
526 }
527 line = reader.readLine();
528 }
529 }
530 }
531 catch (IOException e) {
532 logger.warn("Could not access stdin (maybe a platform limitation)");
533 if (logger.isDebugEnabled()) {
534 logger.debug("Exception details", e);
535 }
536 }
537
538 Set<String> opts = new HashSet<String>();
539 List<String> params = new ArrayList<String>();
540
541 int count = 0;
542 String jobPath = null;
543 String jobIdentifier = null;
544
545 for (String arg : newargs) {
546 if (arg.startsWith("-")) {
547 opts.add(arg);
548 }
549 else {
550 switch (count) {
551 case 0:
552 jobPath = arg;
553 break;
554 case 1:
555 jobIdentifier = arg;
556 break;
557 default:
558 params.add(arg);
559 break;
560 }
561 count++;
562 }
563 }
564
565 if (jobPath == null || jobIdentifier == null) {
566 String message = "At least 2 arguments are required: JobPath and jobIdentifier.";
567 logger.error(message);
568 CommandLineJobRunner.message = message;
569 command.exit(1);
570 }
571
572 String[] parameters = params.toArray(new String[params.size()]);
573
574 int result = command.start(jobPath, jobIdentifier, parameters, opts);
575 command.exit(result);
576 }
577
578 }