1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.step.builder;
17
18 import org.springframework.batch.core.Step;
19 import org.springframework.batch.core.partition.PartitionHandler;
20 import org.springframework.batch.core.partition.StepExecutionSplitter;
21 import org.springframework.batch.core.partition.support.PartitionStep;
22 import org.springframework.batch.core.partition.support.Partitioner;
23 import org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter;
24 import org.springframework.batch.core.partition.support.StepExecutionAggregator;
25 import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
26 import org.springframework.core.task.SyncTaskExecutor;
27 import org.springframework.core.task.TaskExecutor;
28
29
30
31
32
33
34
35
36
37 public class PartitionStepBuilder extends StepBuilderHelper<PartitionStepBuilder> {
38
39 private TaskExecutor taskExecutor;
40
41 private Partitioner partitioner;
42
43 private static final int DEFAULT_GRID_SIZE = 6;
44
45 private Step step;
46
47 private PartitionHandler partitionHandler;
48
49 private int gridSize = DEFAULT_GRID_SIZE;
50
51 private StepExecutionSplitter splitter;
52
53 private StepExecutionAggregator aggregator;
54
55 private String stepName;
56
57
58
59
60
61
62 public PartitionStepBuilder(StepBuilderHelper<?> parent) {
63 super(parent);
64 }
65
66
67
68
69
70
71
72
73
74 public PartitionStepBuilder partitioner(String slaveStepName, Partitioner partitioner) {
75 this.stepName = slaveStepName;
76 this.partitioner = partitioner;
77 return this;
78 }
79
80
81
82
83
84
85
86
87
88 public PartitionStepBuilder step(Step step) {
89 this.step = step;
90 return this;
91 }
92
93
94
95
96
97
98
99
100
101 public PartitionStepBuilder taskExecutor(TaskExecutor taskExecutor) {
102 this.taskExecutor = taskExecutor;
103 return this;
104 }
105
106
107
108
109
110
111
112
113
114
115
116 public PartitionStepBuilder partitionHandler(PartitionHandler partitionHandler) {
117 this.partitionHandler = partitionHandler;
118 return this;
119 }
120
121
122
123
124
125
126
127
128
129 public PartitionStepBuilder gridSize(int gridSize) {
130 this.gridSize = gridSize;
131 return this;
132 }
133
134
135
136
137
138
139
140
141 public PartitionStepBuilder splitter(StepExecutionSplitter splitter) {
142 this.splitter = splitter;
143 return this;
144 }
145
146
147
148
149
150
151
152
153 public PartitionStepBuilder aggregator(StepExecutionAggregator aggregator) {
154 this.aggregator = aggregator;
155 return this;
156 }
157
158 public Step build() {
159
160 PartitionStep step = new PartitionStep();
161 step.setName(getName());
162 super.enhance(step);
163
164 if (partitionHandler != null) {
165 step.setPartitionHandler(partitionHandler);
166 }
167 else {
168 TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
169 partitionHandler.setStep(this.step);
170 if (taskExecutor == null) {
171 taskExecutor = new SyncTaskExecutor();
172 }
173 partitionHandler.setGridSize(gridSize);
174 partitionHandler.setTaskExecutor(taskExecutor);
175 step.setPartitionHandler(partitionHandler);
176 }
177
178 if (splitter != null) {
179 step.setStepExecutionSplitter(splitter);
180 }
181 else {
182
183 boolean allowStartIfComplete = isAllowStartIfComplete();
184 String name = stepName;
185 if (this.step != null) {
186 try {
187 allowStartIfComplete = this.step.isAllowStartIfComplete();
188 name = this.step.getName();
189 }
190 catch (Exception e) {
191 logger.info("Ignored exception from step asking for name and allowStartIfComplete flag. "
192 + "Using default from enclosing PartitionStep (" + name + "," + allowStartIfComplete + ").");
193 }
194 }
195 SimpleStepExecutionSplitter splitter = new SimpleStepExecutionSplitter();
196 splitter.setPartitioner(partitioner);
197 splitter.setJobRepository(getJobRepository());
198 splitter.setAllowStartIfComplete(allowStartIfComplete);
199 splitter.setStepName(name);
200 this.splitter = splitter;
201 step.setStepExecutionSplitter(splitter);
202
203 }
204
205 if (aggregator != null) {
206 step.setStepExecutionAggregator(aggregator);
207 }
208
209 try {
210 step.afterPropertiesSet();
211 }
212 catch (Exception e) {
213 throw new StepBuilderException(e);
214 }
215
216 return step;
217
218 }
219
220 }