1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.core.step.tasklet;
18
19 import java.io.File;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.FutureTask;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.springframework.batch.core.ExitStatus;
26 import org.springframework.batch.core.JobInterruptedException;
27 import org.springframework.batch.core.StepContribution;
28 import org.springframework.batch.core.StepExecution;
29 import org.springframework.batch.core.listener.StepExecutionListenerSupport;
30 import org.springframework.batch.core.scope.context.ChunkContext;
31 import org.springframework.batch.repeat.RepeatStatus;
32 import org.springframework.beans.factory.InitializingBean;
33 import org.springframework.core.task.SimpleAsyncTaskExecutor;
34 import org.springframework.core.task.TaskExecutor;
35 import org.springframework.util.Assert;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public class SystemCommandTasklet extends StepExecutionListenerSupport implements Tasklet, InitializingBean {
59
60 protected static final Log logger = LogFactory.getLog(SystemCommandTasklet.class);
61
62 private String command;
63
64 private String[] environmentParams = null;
65
66 private File workingDirectory = null;
67
68 private SystemProcessExitCodeMapper systemProcessExitCodeMapper = new SimpleSystemProcessExitCodeMapper();
69
70 private long timeout = 0;
71
72 private long checkInterval = 1000;
73
74 private StepExecution execution = null;
75
76 private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
77
78 private boolean interruptOnCancel = false;
79
80
81
82
83
84 @Override
85 public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
86
87 FutureTask<Integer> systemCommandTask = new FutureTask<Integer>(new Callable<Integer>() {
88
89 @Override
90 public Integer call() throws Exception {
91 Process process = Runtime.getRuntime().exec(command, environmentParams, workingDirectory);
92 return process.waitFor();
93 }
94
95 });
96
97 long t0 = System.currentTimeMillis();
98
99 taskExecutor.execute(systemCommandTask);
100
101 while (true) {
102 Thread.sleep(checkInterval);
103 if (systemCommandTask.isDone()) {
104 contribution.setExitStatus(systemProcessExitCodeMapper.getExitStatus(systemCommandTask.get()));
105 return RepeatStatus.FINISHED;
106 }
107 else if (System.currentTimeMillis() - t0 > timeout) {
108 systemCommandTask.cancel(interruptOnCancel);
109 throw new SystemCommandException("Execution of system command did not finish within the timeout");
110 }
111 else if (execution.isTerminateOnly()) {
112 systemCommandTask.cancel(interruptOnCancel);
113 throw new JobInterruptedException("Job interrupted while executing system command '" + command + "'");
114 }
115 }
116
117 }
118
119
120
121
122 public void setCommand(String command) {
123 this.command = command;
124 }
125
126
127
128
129
130 public void setEnvironmentParams(String[] envp) {
131 this.environmentParams = envp;
132 }
133
134
135
136
137
138 public void setWorkingDirectory(String dir) {
139 if (dir == null) {
140 this.workingDirectory = null;
141 return;
142 }
143 this.workingDirectory = new File(dir);
144 Assert.isTrue(workingDirectory.exists(), "working directory must exist");
145 Assert.isTrue(workingDirectory.isDirectory(), "working directory value must be a directory");
146
147 }
148
149 @Override
150 public void afterPropertiesSet() throws Exception {
151 Assert.hasLength(command, "'command' property value is required");
152 Assert.notNull(systemProcessExitCodeMapper, "SystemProcessExitCodeMapper must be set");
153 Assert.isTrue(timeout > 0, "timeout value must be greater than zero");
154 Assert.notNull(taskExecutor, "taskExecutor is required");
155 }
156
157
158
159
160
161
162 public void setSystemProcessExitCodeMapper(SystemProcessExitCodeMapper systemProcessExitCodeMapper) {
163 this.systemProcessExitCodeMapper = systemProcessExitCodeMapper;
164 }
165
166
167
168
169
170
171 public void setTimeout(long timeout) {
172 this.timeout = timeout;
173 }
174
175
176
177
178
179
180
181 public void setTerminationCheckInterval(long checkInterval) {
182 this.checkInterval = checkInterval;
183 }
184
185
186
187
188
189 @Override
190 public void beforeStep(StepExecution stepExecution) {
191 this.execution = stepExecution;
192 }
193
194
195
196
197
198 public void setTaskExecutor(TaskExecutor taskExecutor) {
199 this.taskExecutor = taskExecutor;
200 }
201
202
203
204
205
206
207 public void setInterruptOnCancel(boolean interruptOnCancel) {
208 this.interruptOnCancel = interruptOnCancel;
209 }
210
211 }