1 /*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.springframework.batch.repeat.support;
18
19 import org.springframework.batch.repeat.RepeatCallback;
20 import org.springframework.batch.repeat.RepeatContext;
21 import org.springframework.batch.repeat.RepeatException;
22 import org.springframework.batch.repeat.RepeatOperations;
23 import org.springframework.batch.repeat.RepeatStatus;
24 import org.springframework.core.task.SyncTaskExecutor;
25 import org.springframework.core.task.TaskExecutor;
26 import org.springframework.util.Assert;
27
28 /**
29 * Provides {@link RepeatOperations} support including interceptors that can be
30 * used to modify or monitor the behaviour at run time.<br/>
31 *
32 * This implementation is sufficient to be used to configure transactional
33 * behaviour for each item by making the {@link RepeatCallback} transactional,
34 * or for the whole batch by making the execute method transactional (but only
35 * then if the task executor is synchronous).<br/>
36 *
37 * This class is thread safe if its collaborators are thread safe (interceptors,
38 * terminationPolicy, callback). Normally this will be the case, but clients
39 * need to be aware that if the task executor is asynchronous, then the other
40 * collaborators should be also. In particular the {@link RepeatCallback} that
41 * is wrapped in the execute method must be thread safe - often it is based on
42 * some form of data source, which itself should be both thread safe and
43 * transactional (multiple threads could be accessing it at any given time, and
44 * each thread would have its own transaction).<br/>
45 *
46 * @author Dave Syer
47 *
48 */
49 public class TaskExecutorRepeatTemplate extends RepeatTemplate {
50
51 /**
52 * Default limit for maximum number of concurrent unfinished results allowed
53 * by the template.
54 * {@link #getNextResult(RepeatContext, RepeatCallback, RepeatInternalState)}
55 * .
56 */
57 public static final int DEFAULT_THROTTLE_LIMIT = 4;
58
59 private int throttleLimit = DEFAULT_THROTTLE_LIMIT;
60
61 private TaskExecutor taskExecutor = new SyncTaskExecutor();
62
63 /**
64 * Public setter for the throttle limit. The throttle limit is the largest
65 * number of concurrent tasks that can be executing at one time - if a new
66 * task arrives and the throttle limit is breached we wait for one of the
67 * executing tasks to finish before submitting the new one to the
68 * {@link TaskExecutor}. Default value is {@link #DEFAULT_THROTTLE_LIMIT}.
69 * N.B. when used with a thread pooled {@link TaskExecutor} the thread pool
70 * might prevent the throttle limit actually being reached (so make the core
71 * pool size larger than the throttle limit if possible).
72 *
73 * @param throttleLimit the throttleLimit to set.
74 */
75 public void setThrottleLimit(int throttleLimit) {
76 this.throttleLimit = throttleLimit;
77 }
78
79 /**
80 * Setter for task executor to be used to run the individual item callbacks.
81 *
82 * @param taskExecutor a TaskExecutor
83 * @throws IllegalArgumentException if the argument is null
84 */
85 public void setTaskExecutor(TaskExecutor taskExecutor) {
86 Assert.notNull(taskExecutor);
87 this.taskExecutor = taskExecutor;
88 }
89
90 /**
91 * Use the {@link #setTaskExecutor(TaskExecutor)} to generate a result. The
92 * internal state in this case is a queue of unfinished result holders of
93 * type {@link ResultHolder}. The holder with the return value should not be
94 * on the queue when this method exits. The queue is scoped in the calling
95 * method so there is no need to synchronize access.
96 *
97 */
98 @Override
99 protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
100 throws Throwable {
101
102 ExecutingRunnable runnable = null;
103
104 ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
105
106 do {
107
108 /*
109 * Wrap the callback in a runnable that will add its result to the
110 * queue when it is ready.
111 */
112 runnable = new ExecutingRunnable(callback, context, queue);
113
114 /**
115 * Tell the runnable that it can expect a result. This could have
116 * been in-lined with the constructor, but it might block, so it's
117 * better to do it here, since we have the option (it's a private
118 * class).
119 */
120 runnable.expect();
121
122 /*
123 * Start the task possibly concurrently / in the future.
124 */
125 taskExecutor.execute(runnable);
126
127 /*
128 * Allow termination policy to update its state. This must happen
129 * immediately before or after the call to the task executor.
130 */
131 update(context);
132
133 /*
134 * Keep going until we get a result that is finished, or early
135 * termination...
136 */
137 } while (queue.isEmpty() && !isComplete(context));
138
139 /*
140 * N.B. If the queue is empty then take() blocks until a result appears,
141 * and there must be at least one because we just submitted one to the
142 * task executor.
143 */
144 ResultHolder result = queue.take();
145 if (result.getError() != null) {
146 throw result.getError();
147 }
148 return result.getResult();
149 }
150
151 /**
152 * Wait for all the results to appear on the queue and execute the after
153 * interceptors for each one.
154 *
155 * @see org.springframework.batch.repeat.support.RepeatTemplate#waitForResults(org.springframework.batch.repeat.support.RepeatInternalState)
156 */
157 @Override
158 protected boolean waitForResults(RepeatInternalState state) {
159
160 ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
161
162 boolean result = true;
163
164 while (queue.isExpecting()) {
165
166 /*
167 * Careful that no runnables that are not going to finish ever get
168 * onto the queue, else this may block forever.
169 */
170 ResultHolder future;
171 try {
172 future = queue.take();
173 }
174 catch (InterruptedException e) {
175 Thread.currentThread().interrupt();
176 throw new RepeatException("InterruptedException while waiting for result.");
177 }
178
179 if (future.getError() != null) {
180 state.getThrowables().add(future.getError());
181 result = false;
182 }
183 else {
184 RepeatStatus status = future.getResult();
185 result = result && canContinue(status);
186 executeAfterInterceptors(future.getContext(), status);
187 }
188
189 }
190
191 Assert.state(queue.isEmpty(), "Future results queue should be empty at end of batch.");
192
193 return result;
194 }
195
196 @Override
197 protected RepeatInternalState createInternalState(RepeatContext context) {
198 // Queue of pending results:
199 return new ResultQueueInternalState(throttleLimit);
200 }
201
202 /**
203 * A runnable that puts its result on a queue when it is done.
204 *
205 * @author Dave Syer
206 *
207 */
208 private class ExecutingRunnable implements Runnable, ResultHolder {
209
210 private final RepeatCallback callback;
211
212 private final RepeatContext context;
213
214 private final ResultQueue<ResultHolder> queue;
215
216 private volatile RepeatStatus result;
217
218 private volatile Throwable error;
219
220 public ExecutingRunnable(RepeatCallback callback, RepeatContext context, ResultQueue<ResultHolder> queue) {
221
222 super();
223
224 this.callback = callback;
225 this.context = context;
226 this.queue = queue;
227
228 }
229
230 /**
231 * Tell the queue to expect a result.
232 */
233 public void expect() {
234 try {
235 queue.expect();
236 }
237 catch (InterruptedException e) {
238 Thread.currentThread().interrupt();
239 throw new RepeatException("InterruptedException waiting for to acquire lock on input.");
240 }
241 }
242
243 /**
244 * Execute the batch callback, and store the result, or any exception
245 * that is thrown for retrieval later by caller.
246 *
247 * @see java.lang.Runnable#run()
248 */
249 @Override
250 public void run() {
251 boolean clearContext = false;
252 try {
253 if (RepeatSynchronizationManager.getContext() == null) {
254 clearContext = true;
255 RepeatSynchronizationManager.register(context);
256 }
257
258 if (logger.isDebugEnabled()) {
259 logger.debug("Repeat operation about to start at count=" + context.getStartedCount());
260 }
261
262 result = callback.doInIteration(context);
263
264 }
265 catch (Throwable e) {
266 error = e;
267 }
268 finally {
269
270 if (clearContext) {
271 RepeatSynchronizationManager.clear();
272 }
273
274 queue.put(this);
275
276 }
277 }
278
279 /**
280 * Get the result - never blocks because the queue manages waiting for
281 * the task to finish.
282 */
283 @Override
284 public RepeatStatus getResult() {
285 return result;
286 }
287
288 /**
289 * Get the error - never blocks because the queue manages waiting for
290 * the task to finish.
291 */
292 @Override
293 public Throwable getError() {
294 return error;
295 }
296
297 /**
298 * Getter for the context.
299 */
300 @Override
301 public RepeatContext getContext() {
302 return this.context;
303 }
304
305 }
306
307 /**
308 * @author Dave Syer
309 *
310 */
311 private static class ResultQueueInternalState extends RepeatInternalStateSupport {
312
313 private final ResultQueue<ResultHolder> results;
314
315 /**
316 * @param throttleLimit the throttle limit for the result queue
317 */
318 public ResultQueueInternalState(int throttleLimit) {
319 super();
320 this.results = new ResultHolderResultQueue(throttleLimit);
321 }
322
323 /**
324 * @return the result queue
325 */
326 public ResultQueue<ResultHolder> getResultQueue() {
327 return results;
328 }
329
330 }
331
332 }