1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.job.flow.support.state;
17
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24
25 import org.springframework.batch.core.job.flow.Flow;
26 import org.springframework.batch.core.job.flow.FlowExecution;
27 import org.springframework.batch.core.job.flow.FlowExecutionException;
28 import org.springframework.batch.core.job.flow.FlowExecutionStatus;
29 import org.springframework.batch.core.job.flow.FlowExecutor;
30 import org.springframework.batch.core.job.flow.FlowHolder;
31 import org.springframework.batch.core.job.flow.State;
32 import org.springframework.core.task.SyncTaskExecutor;
33 import org.springframework.core.task.TaskExecutor;
34 import org.springframework.core.task.TaskRejectedException;
35
36
37
38
39
40
41
42
43 public class SplitState extends AbstractState implements FlowHolder {
44
45 private final Collection<Flow> flows;
46
47 private TaskExecutor taskExecutor = new SyncTaskExecutor();
48
49 private FlowExecutionAggregator aggregator = new MaxValueFlowExecutionAggregator();
50
51
52
53
54 public SplitState(Collection<Flow> flows, String name) {
55 super(name);
56 this.flows = flows;
57 }
58
59
60
61
62
63 public void setTaskExecutor(TaskExecutor taskExecutor) {
64 this.taskExecutor = taskExecutor;
65 }
66
67
68
69
70 @Override
71 public Collection<Flow> getFlows() {
72 return flows;
73 }
74
75
76
77
78
79
80
81 @Override
82 public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception {
83
84
85
86 Collection<Future<FlowExecution>> tasks = new ArrayList<Future<FlowExecution>>();
87
88 for (final Flow flow : flows) {
89
90 final FutureTask<FlowExecution> task = new FutureTask<FlowExecution>(new Callable<FlowExecution>() {
91 @Override
92 public FlowExecution call() throws Exception {
93 return flow.start(executor);
94 }
95 });
96
97 tasks.add(task);
98
99 try {
100 taskExecutor.execute(task);
101 }
102 catch (TaskRejectedException e) {
103 throw new FlowExecutionException("TaskExecutor rejected task for flow=" + flow.getName());
104 }
105
106 }
107
108 Collection<FlowExecution> results = new ArrayList<FlowExecution>();
109
110
111 for (Future<FlowExecution> task : tasks) {
112 try {
113 results.add(task.get());
114 }
115 catch (ExecutionException e) {
116
117 Throwable cause = e.getCause();
118 if (cause instanceof Exception) {
119 throw (Exception) cause;
120 } else {
121 throw e;
122 }
123 }
124 }
125
126 return aggregator.aggregate(results);
127
128 }
129
130
131
132
133
134
135 @Override
136 public boolean isEndState() {
137 return false;
138 }
139 }