1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.job.flow.support;
17
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.SortedSet;
26 import java.util.TreeSet;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.springframework.batch.core.JobExecutionException;
31 import org.springframework.batch.core.Step;
32 import org.springframework.batch.core.job.flow.Flow;
33 import org.springframework.batch.core.job.flow.FlowExecution;
34 import org.springframework.batch.core.job.flow.FlowExecutionException;
35 import org.springframework.batch.core.job.flow.FlowExecutionStatus;
36 import org.springframework.batch.core.job.flow.FlowExecutor;
37 import org.springframework.batch.core.job.flow.State;
38 import org.springframework.beans.factory.InitializingBean;
39
40
41
42
43
44
45
46
47
48
49
50 public class SimpleFlow implements Flow, InitializingBean {
51
52 private static final Log logger = LogFactory.getLog(SimpleFlow.class);
53
54 private State startState;
55
56 private Map<String, SortedSet<StateTransition>> transitionMap = new HashMap<String, SortedSet<StateTransition>>();
57
58 private Map<String, State> stateMap = new HashMap<String, State>();
59
60 private List<StateTransition> stateTransitions = new ArrayList<StateTransition>();
61
62 private final String name;
63
64
65
66
67
68
69 public SimpleFlow(String name) {
70 this.name = name;
71 }
72
73
74
75
76
77
78 @Override
79 public String getName() {
80 return name;
81 }
82
83
84
85
86
87
88 public void setStateTransitions(List<StateTransition> stateTransitions) {
89
90 this.stateTransitions = stateTransitions;
91 }
92
93
94
95
96 @Override
97 public State getState(String stateName) {
98 return stateMap.get(stateName);
99 }
100
101
102
103
104 @Override
105 public Collection<State> getStates() {
106 return new HashSet<State>(stateMap.values());
107 }
108
109
110
111
112
113
114 @Override
115 public void afterPropertiesSet() throws Exception {
116 initializeTransitions();
117 }
118
119
120
121
122 @Override
123 public FlowExecution start(FlowExecutor executor) throws FlowExecutionException {
124 if (startState == null) {
125 initializeTransitions();
126 }
127 State state = startState;
128 String stateName = state.getName();
129 return resume(stateName, executor);
130 }
131
132
133
134
135 @Override
136 public FlowExecution resume(String stateName, FlowExecutor executor) throws FlowExecutionException {
137
138 FlowExecutionStatus status = FlowExecutionStatus.UNKNOWN;
139 State state = stateMap.get(stateName);
140
141 logger.debug("Resuming state="+stateName+" with status="+status);
142
143
144 while (state != null && status!=FlowExecutionStatus.STOPPED) {
145
146 stateName = state.getName();
147
148 try {
149 logger.debug("Handling state="+stateName);
150 status = state.handle(executor);
151 }
152 catch (FlowExecutionException e) {
153 executor.close(new FlowExecution(stateName, status));
154 throw e;
155 }
156 catch (Exception e) {
157 executor.close(new FlowExecution(stateName, status));
158 throw new FlowExecutionException(String.format("Ended flow=%s at state=%s with exception", name,
159 stateName), e);
160 }
161
162 logger.debug("Completed state="+stateName+" with status="+status);
163
164 state = nextState(stateName, status);
165
166 }
167
168 FlowExecution result = new FlowExecution(stateName, status);
169 executor.close(result);
170 return result;
171
172 }
173
174
175
176
177
178 private State nextState(String stateName, FlowExecutionStatus status) throws FlowExecutionException {
179
180 Set<StateTransition> set = transitionMap.get(stateName);
181
182 if (set == null) {
183 throw new FlowExecutionException(String.format("No transitions found in flow=%s for state=%s", getName(),
184 stateName));
185 }
186
187 String next = null;
188 String exitCode = status.getName();
189 for (StateTransition stateTransition : set) {
190 if (stateTransition.matches(exitCode)) {
191 if (stateTransition.isEnd()) {
192
193 return null;
194 }
195 next = stateTransition.getNext();
196 break;
197 }
198 }
199
200 if (next == null) {
201 throw new FlowExecutionException(String.format(
202 "Next state not found in flow=%s for state=%s with exit status=%s", getName(), stateName, status.getName()));
203 }
204
205 if (!stateMap.containsKey(next)) {
206 throw new FlowExecutionException(String.format("Next state not specified in flow=%s for next=%s",
207 getName(), next));
208 }
209
210 return stateMap.get(next);
211
212 }
213
214
215
216
217
218 private void initializeTransitions() {
219 startState = null;
220 transitionMap.clear();
221 stateMap.clear();
222 boolean hasEndStep = false;
223
224 if (stateTransitions.isEmpty()) {
225 throw new IllegalArgumentException("No start state was found. You must specify at least one step in a job.");
226 }
227
228 for (StateTransition stateTransition : stateTransitions) {
229 State state = stateTransition.getState();
230 String stateName = state.getName();
231 stateMap.put(stateName, state);
232 }
233
234 for (StateTransition stateTransition : stateTransitions) {
235
236 State state = stateTransition.getState();
237
238 if (!stateTransition.isEnd()) {
239
240 String next = stateTransition.getNext();
241
242 if (!stateMap.containsKey(next)) {
243 throw new IllegalArgumentException("Missing state for [" + stateTransition + "]");
244 }
245
246 }
247 else {
248 hasEndStep = true;
249 }
250
251 String name = state.getName();
252
253 SortedSet<StateTransition> set = transitionMap.get(name);
254 if (set == null) {
255 set = new TreeSet<StateTransition>();
256 transitionMap.put(name, set);
257 }
258 set.add(stateTransition);
259
260 }
261
262 if (!hasEndStep) {
263 throw new IllegalArgumentException(
264 "No end state was found. You must specify at least one transition with no next state.");
265 }
266
267 startState = stateTransitions.get(0).getState();
268
269 }
270 }