1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.poller;
17
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Future;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.TimeoutException;
23
24
25
26
27
28
29
30
31
32
33
34 public class DirectPoller<S> implements Poller<S> {
35
36 private final long interval;
37
38 public DirectPoller(long interval) {
39 this.interval = interval;
40 }
41
42
43
44
45
46
47
48
49 @Override
50 public Future<S> poll(Callable<S> callable) throws Exception {
51 return new DirectPollingFuture<S>(interval, callable);
52 }
53
54 private static class DirectPollingFuture<S> implements Future<S> {
55
56 private final long startTime = System.currentTimeMillis();
57
58 private volatile boolean cancelled;
59
60 private volatile S result = null;
61
62 private final long interval;
63
64 private final Callable<S> callable;
65
66 public DirectPollingFuture(long interval, Callable<S> callable) {
67 this.interval = interval;
68 this.callable = callable;
69 }
70
71 @Override
72 public boolean cancel(boolean mayInterruptIfRunning) {
73 cancelled = true;
74 return true;
75 }
76
77 @Override
78 public S get() throws InterruptedException, ExecutionException {
79 try {
80 return get(-1, TimeUnit.MILLISECONDS);
81 }
82 catch (TimeoutException e) {
83 throw new IllegalStateException("Unexpected timeout waiting for result", e);
84 }
85 }
86
87 @Override
88 public S get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
89
90 try {
91 result = callable.call();
92 }
93 catch (Exception e) {
94 throw new ExecutionException(e);
95 }
96
97 Long nextExecutionTime = startTime + interval;
98 long currentTimeMillis = System.currentTimeMillis();
99 long timeoutMillis = TimeUnit.MILLISECONDS.convert(timeout, unit);
100
101 while (result == null && !cancelled) {
102
103 long delta = nextExecutionTime - startTime;
104 if (delta >= timeoutMillis && timeoutMillis > 0) {
105 throw new TimeoutException("Timed out waiting for task to return non-null result");
106 }
107
108 if (nextExecutionTime > currentTimeMillis) {
109 Thread.sleep(nextExecutionTime - currentTimeMillis);
110 }
111
112 currentTimeMillis = System.currentTimeMillis();
113 nextExecutionTime = currentTimeMillis + interval;
114
115 try {
116 result = callable.call();
117 }
118 catch (Exception e) {
119 throw new ExecutionException(e);
120 }
121
122 }
123
124 return result;
125
126 }
127
128 @Override
129 public boolean isCancelled() {
130 return cancelled;
131 }
132
133 @Override
134 public boolean isDone() {
135 return cancelled || result != null;
136 }
137
138 }
139
140 }