1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.repeat.support;
18
19 import java.util.Comparator;
20 import java.util.NoSuchElementException;
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.PriorityBlockingQueue;
23 import java.util.concurrent.Semaphore;
24
25 import org.springframework.batch.repeat.RepeatStatus;
26
27
28
29
30
31
32
33 public class ResultHolderResultQueue implements ResultQueue<ResultHolder> {
34
35
36 private final BlockingQueue<ResultHolder> results;
37
38
39 private final Semaphore waits;
40
41 private final Object lock = new Object();
42
43 private volatile int count = 0;
44
45
46
47
48
49 public ResultHolderResultQueue(int throttleLimit) {
50 results = new PriorityBlockingQueue<ResultHolder>(throttleLimit, new ResultHolderComparator());
51 waits = new Semaphore(throttleLimit);
52 }
53
54 @Override
55 public boolean isEmpty() {
56 return results.isEmpty();
57 }
58
59
60
61
62
63
64 @Override
65 public boolean isExpecting() {
66
67
68
69 return count > 0;
70 }
71
72
73
74
75
76
77
78
79 @Override
80 public void expect() throws InterruptedException {
81 waits.acquire();
82
83 synchronized (lock) {
84 count++;
85 }
86 }
87
88 @Override
89 public void put(ResultHolder holder) throws IllegalArgumentException {
90 if (!isExpecting()) {
91 throw new IllegalArgumentException("Not expecting a result. Call expect() before put().");
92 }
93 results.add(holder);
94
95
96 waits.release();
97 synchronized (lock) {
98 lock.notifyAll();
99 }
100 }
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 @Override
123 public ResultHolder take() throws NoSuchElementException, InterruptedException {
124 if (!isExpecting()) {
125 throw new NoSuchElementException("Not expecting a result. Call expect() before take().");
126 }
127 ResultHolder value;
128 synchronized (lock) {
129 value = results.take();
130 if (isContinuable(value)) {
131
132 count--;
133 return value;
134 }
135 }
136 results.put(value);
137 synchronized (lock) {
138 while (count > results.size()) {
139 lock.wait();
140 }
141 value = results.take();
142 count--;
143 }
144 return value;
145 }
146
147 private boolean isContinuable(ResultHolder value) {
148 return value.getResult() != null && value.getResult().isContinuable();
149 }
150
151
152
153
154
155
156
157 private static class ResultHolderComparator implements Comparator<ResultHolder> {
158 @Override
159 public int compare(ResultHolder h1, ResultHolder h2) {
160 RepeatStatus result1 = h1.getResult();
161 RepeatStatus result2 = h2.getResult();
162 if (result1 == null && result2 == null) {
163 return 0;
164 }
165 if (result1 == null) {
166 return -1;
167 }
168 else if (result2 == null) {
169 return 1;
170 }
171 if ((result1.isContinuable() && result2.isContinuable())
172 || (!result1.isContinuable() && !result2.isContinuable())) {
173 return 0;
174 }
175 if (result1.isContinuable()) {
176 return -1;
177 }
178 return 1;
179 }
180 }
181
182 }