1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.launch.support;
17
18 import org.apache.commons.logging.Log;
19 import org.apache.commons.logging.LogFactory;
20 import org.springframework.batch.core.BatchStatus;
21 import org.springframework.batch.core.ExitStatus;
22 import org.springframework.batch.core.Job;
23 import org.springframework.batch.core.JobExecution;
24 import org.springframework.batch.core.JobInstance;
25 import org.springframework.batch.core.JobParameters;
26 import org.springframework.batch.core.JobParametersInvalidException;
27 import org.springframework.batch.core.launch.JobLauncher;
28 import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
29 import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
30 import org.springframework.batch.core.repository.JobRepository;
31 import org.springframework.batch.core.repository.JobRestartException;
32 import org.springframework.beans.factory.InitializingBean;
33 import org.springframework.core.task.SyncTaskExecutor;
34 import org.springframework.core.task.TaskExecutor;
35 import org.springframework.core.task.TaskRejectedException;
36 import org.springframework.util.Assert;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 public class SimpleJobLauncher implements JobLauncher, InitializingBean {
63
64 protected static final Log logger = LogFactory.getLog(SimpleJobLauncher.class);
65
66 private JobRepository jobRepository;
67
68 private TaskExecutor taskExecutor;
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 @Override
87 public JobExecution run(final Job job, final JobParameters jobParameters)
88 throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
89 JobParametersInvalidException {
90
91 Assert.notNull(job, "The Job must not be null.");
92 Assert.notNull(jobParameters, "The JobParameters must not be null.");
93
94 final JobExecution jobExecution;
95 JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
96 if (lastExecution != null) {
97 if (!job.isRestartable()) {
98 throw new JobRestartException("JobInstance already exists and is not restartable");
99 }
100 }
101
102
103
104 job.getJobParametersValidator().validate(jobParameters);
105
106
107
108
109
110
111
112 jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
113
114 try {
115 taskExecutor.execute(new Runnable() {
116
117 @Override
118 public void run() {
119 try {
120 logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
121 + "]");
122 job.execute(jobExecution);
123 logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
124 + "] and the following status: [" + jobExecution.getStatus() + "]");
125 }
126 catch (Throwable t) {
127 logger.info("Job: [" + job
128 + "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
129 + "]", t);
130 rethrow(t);
131 }
132 }
133
134 private void rethrow(Throwable t) {
135 if (t instanceof RuntimeException) {
136 throw (RuntimeException) t;
137 }
138 else if (t instanceof Error) {
139 throw (Error) t;
140 }
141 throw new IllegalStateException(t);
142 }
143 });
144 }
145 catch (TaskRejectedException e) {
146 jobExecution.upgradeStatus(BatchStatus.FAILED);
147 if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
148 jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
149 }
150 jobRepository.update(jobExecution);
151 }
152
153 return jobExecution;
154 }
155
156
157
158
159
160
161 public void setJobRepository(JobRepository jobRepository) {
162 this.jobRepository = jobRepository;
163 }
164
165
166
167
168
169
170 public void setTaskExecutor(TaskExecutor taskExecutor) {
171 this.taskExecutor = taskExecutor;
172 }
173
174
175
176
177
178 @Override
179 public void afterPropertiesSet() throws Exception {
180 Assert.state(jobRepository != null, "A JobRepository has not been set.");
181 if (taskExecutor == null) {
182 logger.info("No TaskExecutor has been set, defaulting to synchronous executor.");
183 taskExecutor = new SyncTaskExecutor();
184 }
185 }
186
187 }