1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.core.partition.support;
18
19 import java.util.HashSet;
20 import java.util.Set;
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24
25 import org.springframework.batch.core.BatchStatus;
26 import org.springframework.batch.core.ExitStatus;
27 import org.springframework.batch.core.Step;
28 import org.springframework.batch.core.StepExecution;
29 import org.springframework.batch.core.partition.PartitionHandler;
30 import org.springframework.batch.core.step.StepHolder;
31 import org.springframework.beans.factory.InitializingBean;
32 import org.springframework.beans.factory.annotation.Required;
33 import org.springframework.core.task.SyncTaskExecutor;
34 import org.springframework.core.task.TaskExecutor;
35 import org.springframework.core.task.TaskRejectedException;
36 import org.springframework.util.Assert;
37
38
39
40
41
42
43
44
45
46
47
48
49
50 public class TaskExecutorPartitionHandler extends AbstractPartitionHandler implements StepHolder, InitializingBean {
51
52 private TaskExecutor taskExecutor = new SyncTaskExecutor();
53
54 private Step step;
55
56 @Override
57 public void afterPropertiesSet() throws Exception {
58 }
59
60
61
62
63
64
65 public void setTaskExecutor(TaskExecutor taskExecutor) {
66 this.taskExecutor = taskExecutor;
67 }
68
69
70
71
72
73
74
75
76
77 @Required
78 public void setStep(Step step) {
79 this.step = step;
80 }
81
82
83
84
85
86
87
88 @Override
89 public Step getStep() {
90 return this.step;
91 }
92
93 @Override
94 protected Set<StepExecution> doHandle(StepExecution masterStepExecution,
95 Set<StepExecution> partitionStepExecutions) throws Exception {
96 Assert.notNull(step, "A Step must be provided.");
97 final Set<Future<StepExecution>> tasks = new HashSet<Future<StepExecution>>(getGridSize());
98 final Set<StepExecution> result = new HashSet<StepExecution>();
99
100 for (final StepExecution stepExecution : partitionStepExecutions) {
101 final FutureTask<StepExecution> task = createTask(step, stepExecution);
102
103 try {
104 taskExecutor.execute(task);
105 tasks.add(task);
106 } catch (TaskRejectedException e) {
107
108 ExitStatus exitStatus = ExitStatus.FAILED
109 .addExitDescription("TaskExecutor rejected the task for this step.");
110
111
112
113
114 stepExecution.setStatus(BatchStatus.FAILED);
115 stepExecution.setExitStatus(exitStatus);
116 result.add(stepExecution);
117 }
118 }
119
120 for (Future<StepExecution> task : tasks) {
121 result.add(task.get());
122 }
123
124 return result;
125 }
126
127
128
129
130
131
132
133
134 protected FutureTask<StepExecution> createTask(final Step step,
135 final StepExecution stepExecution) {
136 return new FutureTask<StepExecution>(new Callable<StepExecution>() {
137 @Override
138 public StepExecution call() throws Exception {
139 step.execute(stepExecution);
140 return stepExecution;
141 }
142 });
143 }
144
145 }