1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.core.job.flow;
18
19 import org.springframework.batch.core.BatchStatus;
20 import org.springframework.batch.core.ExitStatus;
21 import org.springframework.batch.core.JobExecution;
22 import org.springframework.batch.core.JobInterruptedException;
23 import org.springframework.batch.core.StartLimitExceededException;
24 import org.springframework.batch.core.Step;
25 import org.springframework.batch.core.StepExecution;
26 import org.springframework.batch.core.job.StepHandler;
27 import org.springframework.batch.core.repository.JobRepository;
28 import org.springframework.batch.core.repository.JobRestartException;
29
30
31
32
33
34
35
36
37 public class JobFlowExecutor implements FlowExecutor {
38
39 private final ThreadLocal<StepExecution> stepExecutionHolder = new ThreadLocal<StepExecution>();
40
41 private final JobExecution execution;
42
43 private ExitStatus exitStatus = ExitStatus.EXECUTING;
44
45 private final StepHandler stepHandler;
46
47 private final JobRepository jobRepository;
48
49
50
51
52 public JobFlowExecutor(JobRepository jobRepository, StepHandler stepHandler, JobExecution execution) {
53 this.jobRepository = jobRepository;
54 this.stepHandler = stepHandler;
55 this.execution = execution;
56 stepExecutionHolder.set(null);
57 }
58
59 @Override
60 public String executeStep(Step step) throws JobInterruptedException, JobRestartException,
61 StartLimitExceededException {
62 StepExecution stepExecution = stepHandler.handleStep(step, execution);
63 stepExecutionHolder.set(stepExecution);
64 if (stepExecution == null) {
65 return ExitStatus.COMPLETED.getExitCode();
66 }
67 if (stepExecution.isTerminateOnly()) {
68 throw new JobInterruptedException("Step requested termination: "+stepExecution, stepExecution.getStatus());
69 }
70 return stepExecution.getExitStatus().getExitCode();
71 }
72
73 @Override
74 public void abandonStepExecution() {
75 StepExecution lastStepExecution = stepExecutionHolder.get();
76 if (lastStepExecution != null && lastStepExecution.getStatus().isGreaterThan(BatchStatus.STOPPING)) {
77 lastStepExecution.upgradeStatus(BatchStatus.ABANDONED);
78 jobRepository.update(lastStepExecution);
79 }
80 }
81
82 @Override
83 public void updateJobExecutionStatus(FlowExecutionStatus status) {
84 execution.setStatus(findBatchStatus(status));
85 exitStatus = exitStatus.and(new ExitStatus(status.getName()));
86 execution.setExitStatus(exitStatus);
87 }
88
89 @Override
90 public JobExecution getJobExecution() {
91 return execution;
92 }
93
94 @Override
95 public StepExecution getStepExecution() {
96 return stepExecutionHolder.get();
97 }
98
99 @Override
100 public void close(FlowExecution result) {
101 stepExecutionHolder.set(null);
102 }
103
104 @Override
105 public boolean isRestart() {
106 if (getStepExecution() != null && getStepExecution().getStatus() == BatchStatus.ABANDONED) {
107
108
109
110
111
112 return true;
113 }
114 return execution.getStepExecutions().isEmpty();
115 }
116
117 @Override
118 public void addExitStatus(String code) {
119 exitStatus = exitStatus.and(new ExitStatus(code));
120 }
121
122
123
124
125
126 private BatchStatus findBatchStatus(FlowExecutionStatus status) {
127 for (BatchStatus batchStatus : BatchStatus.values()) {
128 if (status.getName().startsWith(batchStatus.toString())) {
129 return batchStatus;
130 }
131 }
132 return BatchStatus.UNKNOWN;
133 }
134
135 }