1 /*
2 * Copyright 2002-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.springframework.batch.repeat.support;
18
19 import java.util.NoSuchElementException;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.LinkedBlockingQueue;
22 import java.util.concurrent.Semaphore;
23
24 /**
25 * An implementation of the {@link ResultQueue} that throttles the number of
26 * expected results, limiting it to a maximum at any given time.
27 *
28 * @author Dave Syer
29 */
30 public class ThrottleLimitResultQueue<T> implements ResultQueue<T> {
31
32 // Accumulation of result objects as they finish.
33 private final BlockingQueue<T> results;
34
35 // Accumulation of dummy objects flagging expected results in the future.
36 private final Semaphore waits;
37
38 private final Object lock = new Object();
39
40 private volatile int count = 0;
41
42 /**
43 * @param throttleLimit the maximum number of results that can be expected
44 * at any given time.
45 */
46 public ThrottleLimitResultQueue(int throttleLimit) {
47 results = new LinkedBlockingQueue<T>();
48 waits = new Semaphore(throttleLimit);
49 }
50
51 @Override
52 public boolean isEmpty() {
53 return results.isEmpty();
54 }
55
56 /*
57 * (non-Javadoc)
58 *
59 * @see org.springframework.batch.repeat.support.ResultQueue#isExpecting()
60 */
61 @Override
62 public boolean isExpecting() {
63 // Base the decision about whether we expect more results on a
64 // counter of the number of expected results actually collected.
65 // Do not synchronize! Otherwise put and expect can deadlock.
66 return count > 0;
67 }
68
69 /**
70 * Tell the queue to expect one more result. Blocks until a new result is
71 * available if already expecting too many (as determined by the throttle
72 * limit).
73 *
74 * @see ResultQueue#expect()
75 */
76 @Override
77 public void expect() throws InterruptedException {
78 synchronized (lock) {
79 waits.acquire();
80 count++;
81 }
82 }
83
84 @Override
85 public void put(T holder) throws IllegalArgumentException {
86 if (!isExpecting()) {
87 throw new IllegalArgumentException("Not expecting a result. Call expect() before put().");
88 }
89 // There should be no need to block here, or to use offer()
90 results.add(holder);
91 // Take from the waits queue now to allow another result to
92 // accumulate. But don't decrement the counter.
93 waits.release();
94 }
95
96 @Override
97 public T take() throws NoSuchElementException, InterruptedException {
98 if (!isExpecting()) {
99 throw new NoSuchElementException("Not expecting a result. Call expect() before take().");
100 }
101 T value;
102 synchronized (lock) {
103 value = results.take();
104 // Decrement the counter only when the result is collected.
105 count--;
106 }
107 return value;
108 }
109
110 }