1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.job.flow;
17
18 import java.util.Collection;
19 import java.util.Map;
20 import java.util.concurrent.ConcurrentHashMap;
21
22 import org.springframework.batch.core.Job;
23 import org.springframework.batch.core.JobExecution;
24 import org.springframework.batch.core.JobExecutionException;
25 import org.springframework.batch.core.Step;
26 import org.springframework.batch.core.job.AbstractJob;
27 import org.springframework.batch.core.job.SimpleStepHandler;
28 import org.springframework.batch.core.step.StepHolder;
29 import org.springframework.batch.core.step.StepLocator;
30
31
32
33
34
35
36
37
38
39
40 public class FlowJob extends AbstractJob {
41
42 private Flow flow;
43
44 private Map<String, Step> stepMap = new ConcurrentHashMap<String, Step>();
45
46 private volatile boolean initialized = false;
47
48
49
50
51 public FlowJob() {
52 super();
53 }
54
55
56
57
58 public FlowJob(String name) {
59 super(name);
60 }
61
62
63
64
65
66
67 public void setFlow(Flow flow) {
68 this.flow = flow;
69 }
70
71
72
73
74 @Override
75 public Step getStep(String stepName) {
76 if (!initialized) {
77 init();
78 }
79 return stepMap.get(stepName);
80 }
81
82
83
84
85 private void init() {
86 findSteps(flow, stepMap);
87 }
88
89
90
91
92
93 private void findSteps(Flow flow, Map<String, Step> map) {
94
95 for (State state : flow.getStates()) {
96 if (state instanceof StepHolder) {
97 Step step = ((StepHolder) state).getStep();
98 String name = step.getName();
99 stepMap.put(name, step);
100 }
101 else if (state instanceof FlowHolder) {
102 for (Flow subflow : ((FlowHolder) state).getFlows()) {
103 findSteps(subflow, map);
104 }
105 }
106 else if (state instanceof StepLocator) {
107 StepLocator locator = (StepLocator) state;
108 for (String name : locator.getStepNames()) {
109 map.put(name, locator.getStep(name));
110 }
111 }
112 }
113
114 }
115
116
117
118
119 @Override
120 public Collection<String> getStepNames() {
121 if (!initialized) {
122 init();
123 }
124 return stepMap.keySet();
125 }
126
127
128
129
130 @Override
131 protected void doExecute(final JobExecution execution) throws JobExecutionException {
132 try {
133 JobFlowExecutor executor = new JobFlowExecutor(getJobRepository(),
134 new SimpleStepHandler(getJobRepository()), execution);
135 executor.updateJobExecutionStatus(flow.start(executor).getStatus());
136 }
137 catch (FlowExecutionException e) {
138 if (e.getCause() instanceof JobExecutionException) {
139 throw (JobExecutionException) e.getCause();
140 }
141 throw new JobExecutionException("Flow execution ended unexpectedly", e);
142 }
143 }
144
145 }