1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.job.builder;
17
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26
27 import org.springframework.batch.core.ExitStatus;
28 import org.springframework.batch.core.Step;
29 import org.springframework.batch.core.job.flow.Flow;
30 import org.springframework.batch.core.job.flow.FlowExecutionStatus;
31 import org.springframework.batch.core.job.flow.JobExecutionDecider;
32 import org.springframework.batch.core.job.flow.State;
33 import org.springframework.batch.core.job.flow.support.SimpleFlow;
34 import org.springframework.batch.core.job.flow.support.StateTransition;
35 import org.springframework.batch.core.job.flow.support.state.DecisionState;
36 import org.springframework.batch.core.job.flow.support.state.EndState;
37 import org.springframework.batch.core.job.flow.support.state.FlowState;
38 import org.springframework.batch.core.job.flow.support.state.SplitState;
39 import org.springframework.batch.core.job.flow.support.state.StepState;
40 import org.springframework.core.task.TaskExecutor;
41
42
43
44
45
46
47
48
49
50
51
52
53 public class FlowBuilder<Q> {
54
55 private String name;
56
57 private String prefix;
58
59 private List<StateTransition> transitions = new ArrayList<StateTransition>();
60
61 private Map<String, State> tos = new HashMap<String, State>();
62
63 private State currentState;
64
65 private EndState failedState;
66
67 private EndState completedState;
68
69 private EndState stoppedState;
70
71 private int decisionCounter = 0;
72
73 private int splitCounter = 0;
74
75 private int endCounter = 0;
76
77 private Map<Object, State> states = new HashMap<Object, State>();
78
79 private SimpleFlow flow;
80
81 private boolean dirty = true;
82
83 public FlowBuilder(String name) {
84 this.name = name;
85 this.prefix = name + ".";
86 this.failedState = new EndState(FlowExecutionStatus.FAILED, prefix + "FAILED");
87 this.completedState = new EndState(FlowExecutionStatus.COMPLETED, prefix + "COMPLETED");
88 this.stoppedState = new EndState(FlowExecutionStatus.STOPPED, prefix + "STOPPED");
89 }
90
91
92
93
94
95
96
97 public Q build() {
98 @SuppressWarnings("unchecked")
99 Q result = (Q) flow();
100 return result;
101 }
102
103
104
105
106
107
108
109
110 public FlowBuilder<Q> next(Step step) {
111 doNext(step);
112 return this;
113 }
114
115
116
117
118
119
120
121 public FlowBuilder<Q> start(Step step) {
122 doStart(step);
123 return this;
124 }
125
126
127
128
129
130
131
132
133 public FlowBuilder<Q> from(Step step) {
134 doFrom(step);
135 return this;
136 }
137
138
139
140
141
142
143
144
145 public UnterminatedFlowBuilder<Q> next(JobExecutionDecider decider) {
146 doNext(decider);
147 return new UnterminatedFlowBuilder<Q>(this);
148 }
149
150
151
152
153
154
155
156 public UnterminatedFlowBuilder<Q> start(JobExecutionDecider decider) {
157 doStart(decider);
158 return new UnterminatedFlowBuilder<Q>(this);
159 }
160
161
162
163
164
165
166
167 public UnterminatedFlowBuilder<Q> from(JobExecutionDecider decider) {
168 doFrom(decider);
169 return new UnterminatedFlowBuilder<Q>(this);
170 }
171
172
173
174
175
176
177
178 public FlowBuilder<Q> next(Flow flow) {
179 doNext(flow);
180 return this;
181 }
182
183
184
185
186
187
188
189 public FlowBuilder<Q> from(Flow flow) {
190 doFrom(flow);
191 return this;
192 }
193
194
195
196
197
198
199
200 public FlowBuilder<Q> start(Flow flow) {
201 doStart(flow);
202 return this;
203 }
204
205
206
207
208
209 public SplitBuilder<Q> split(TaskExecutor executor) {
210 return new SplitBuilder<Q>(this, executor);
211 }
212
213
214
215
216
217
218
219
220
221 public TransitionBuilder<Q> on(String pattern) {
222 return new TransitionBuilder<Q>(this, pattern);
223 }
224
225
226
227
228
229
230
231 public final Q end() {
232 return build();
233 }
234
235 protected Flow flow() {
236 if (!dirty) {
237
238 return flow;
239 }
240 flow = new SimpleFlow(name);
241
242 if (currentState instanceof FlowState && states.size() == 1) {
243 return ((FlowState) currentState).getFlows().iterator().next();
244 }
245 addDanglingEndStates();
246 flow.setStateTransitions(transitions);
247 dirty = false;
248 return flow;
249 }
250
251 private void doNext(Object input) {
252 if (this.currentState == null) {
253 doStart(input);
254 }
255 State next = createState(input);
256 addTransition("COMPLETED", next);
257 addTransition("*", failedState);
258 this.currentState = next;
259 }
260
261 private void doStart(Object input) {
262 if (this.currentState != null) {
263 doFrom(input);
264 }
265 this.currentState = createState(input);
266 }
267
268 private void doFrom(Object input) {
269 if (currentState == null) {
270 doStart(input);
271 }
272 State state = createState(input);
273 tos.put(currentState.getName(), currentState);
274 this.currentState = state;
275 }
276
277 private State createState(Object input) {
278 State result;
279 if (input instanceof Step) {
280 if (!states.containsKey(input)) {
281 Step step = (Step) input;
282 states.put(input, new StepState(prefix + step.getName(), step));
283 }
284 result = states.get(input);
285 }
286 else if (input instanceof JobExecutionDecider) {
287 if (!states.containsKey(input)) {
288 states.put(input, new DecisionState((JobExecutionDecider) input, prefix + "decision"
289 + (decisionCounter++)));
290 }
291 result = states.get(input);
292 }
293 else if (input instanceof Flow) {
294 if (!states.containsKey(input)) {
295 states.put(input, new FlowState((Flow) input, prefix + ((Flow) input).getName()));
296 }
297 result = states.get(input);
298 }
299 else {
300 throw new FlowBuilderException("No state can be created for: " + input);
301 }
302 dirty = true;
303 return result;
304 }
305
306 private SplitState createState(Collection<Flow> flows, TaskExecutor executor) {
307 if (!states.containsKey(flows)) {
308 states.put(flows, new SplitState(flows, prefix + "split" + (splitCounter++)));
309 }
310 SplitState result = (SplitState) states.get(flows);
311 if (executor != null) {
312 result.setTaskExecutor(executor);
313 }
314 dirty = true;
315 return result;
316 }
317
318 private void addDanglingEndStates() {
319 Set<String> froms = new HashSet<String>();
320 for (StateTransition transition : transitions) {
321 froms.add(transition.getState().getName());
322 }
323 if (tos.isEmpty() && currentState != null) {
324 tos.put(currentState.getName(), currentState);
325 }
326 Map<String, State> copy = new HashMap<String, State>(tos);
327
328 for (String to : copy.keySet()) {
329 if (!froms.contains(to)) {
330 currentState = copy.get(to);
331 if (!currentState.isEndState()) {
332 addTransition("COMPLETED", completedState);
333 addTransition("*", failedState);
334 }
335 }
336 }
337 copy = new HashMap<String, State>(tos);
338
339 for (String from : copy.keySet()) {
340 currentState = copy.get(from);
341 if (!currentState.isEndState()) {
342 if (!hasFail(from)) {
343 addTransition("*", failedState);
344 }
345 if (!hasCompleted(from)) {
346 addTransition("*", completedState);
347 }
348 }
349 }
350 }
351
352 private boolean hasFail(String from) {
353 return matches(from, "FAILED");
354 }
355
356 private boolean hasCompleted(String from) {
357 return matches(from, "COMPLETED");
358 }
359
360 private boolean matches(String from, String status) {
361 for (StateTransition transition : transitions) {
362 if (from.equals(transition.getState().getName()) && transition.matches(status)) {
363 return true;
364 }
365 }
366 return false;
367 }
368
369 private void addTransition(String pattern, State next) {
370 tos.put(next.getName(), next);
371 transitions.add(StateTransition.createStateTransition(currentState, pattern, next.getName()));
372 if (transitions.size() == 1) {
373 transitions.add(StateTransition.createEndStateTransition(failedState));
374 transitions.add(StateTransition.createEndStateTransition(completedState));
375 transitions.add(StateTransition.createEndStateTransition(stoppedState));
376 }
377 if (next.isEndState()) {
378 transitions.add(StateTransition.createEndStateTransition(next));
379 }
380 dirty = true;
381 }
382
383 private void stop(String pattern) {
384 addTransition(pattern, stoppedState);
385 }
386
387 private void stop(String pattern, State restart) {
388 EndState next = new EndState(FlowExecutionStatus.STOPPED, "STOPPED", prefix + "stop" + (endCounter++), true);
389 addTransition(pattern, next);
390 currentState = next;
391 addTransition("*", restart);
392 }
393
394 private void end(String pattern) {
395 addTransition(pattern, completedState);
396 }
397
398 private void end(String pattern, String code) {
399 addTransition(pattern, new EndState(FlowExecutionStatus.COMPLETED, code, prefix + "end" + (endCounter++)));
400 }
401
402 private void fail(String pattern) {
403 addTransition(pattern, failedState);
404 }
405
406
407
408
409
410
411
412
413 public static class UnterminatedFlowBuilder<Q> {
414
415 private final FlowBuilder<Q> parent;
416
417 public UnterminatedFlowBuilder(FlowBuilder<Q> parent) {
418 this.parent = parent;
419 }
420
421
422
423
424
425
426
427
428
429 public TransitionBuilder<Q> on(String pattern) {
430 return new TransitionBuilder<Q>(parent, pattern);
431 }
432
433 }
434
435
436
437
438
439
440
441
442 public static class TransitionBuilder<Q> {
443
444 private final FlowBuilder<Q> parent;
445
446 private final String pattern;
447
448 public TransitionBuilder(FlowBuilder<Q> parent, String pattern) {
449 this.parent = parent;
450 this.pattern = pattern;
451 }
452
453
454
455
456
457
458
459 public FlowBuilder<Q> to(Step step) {
460 State next = parent.createState(step);
461 parent.addTransition(pattern, next);
462 parent.currentState = next;
463 return parent;
464 }
465
466
467
468
469
470
471
472 public FlowBuilder<Q> to(Flow flow) {
473 State next = parent.createState(flow);
474 parent.addTransition(pattern, next);
475 parent.currentState = next;
476 return parent;
477 }
478
479
480
481
482
483
484
485 public FlowBuilder<Q> to(JobExecutionDecider decider) {
486 State next = parent.createState(decider);
487 parent.addTransition(pattern, next);
488 parent.currentState = next;
489 return parent;
490 }
491
492
493
494
495
496
497 public FlowBuilder<Q> stop() {
498 parent.stop(pattern);
499 return parent;
500 }
501
502
503
504
505
506
507
508 public FlowBuilder<Q> stopAndRestart(Flow flow) {
509 State next = parent.createState(flow);
510 parent.stop(pattern, next);
511 return parent;
512 }
513
514
515
516
517
518
519
520 public FlowBuilder<Q> stopAndRestart(JobExecutionDecider decider) {
521 State next = parent.createState(decider);
522 parent.stop(pattern, next);
523 return parent;
524 }
525
526
527
528
529
530
531
532 public FlowBuilder<Q> stopAndRestart(Step restart) {
533 State next = parent.createState(restart);
534 parent.stop(pattern, next);
535 return parent;
536 }
537
538
539
540
541
542
543 public FlowBuilder<Q> end() {
544 parent.end(pattern);
545 return parent;
546 }
547
548
549
550
551
552
553 public FlowBuilder<Q> end(String status) {
554 parent.end(pattern, status);
555 return parent;
556 }
557
558
559
560
561
562
563 public FlowBuilder<Q> fail() {
564 parent.fail(pattern);
565 return parent;
566 }
567 }
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589 public static class SplitBuilder<Q> {
590
591 private final FlowBuilder<Q> parent;
592
593 private TaskExecutor executor;
594
595
596
597
598
599 public SplitBuilder(FlowBuilder<Q> parent, TaskExecutor executor) {
600 this.parent = parent;
601 this.executor = executor;
602 }
603
604
605
606
607
608
609
610 public FlowBuilder<Q> add(Flow... flows) {
611 Collection<Flow> list = new ArrayList<Flow>(Arrays.asList(flows));
612 String name = "split" + (parent.splitCounter++);
613 int counter = 0;
614 State one = parent.currentState;
615 Flow flow = null;
616 if (!(one instanceof FlowState)) {
617 FlowBuilder<Flow> stateBuilder = new FlowBuilder<Flow>(name + "_" + (counter++));
618 stateBuilder.currentState = one;
619 flow = stateBuilder.build();
620 }
621 if (flow != null) {
622 list.add(flow);
623 }
624 State next = parent.createState(list, executor);
625 parent.currentState = next;
626 return parent;
627 }
628
629 }
630
631 }