1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.repository.dao;
17
18 import java.lang.reflect.Field;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.Comparator;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.springframework.batch.core.Entity;
28 import org.springframework.batch.core.JobExecution;
29 import org.springframework.batch.core.StepExecution;
30 import org.springframework.batch.support.SerializationUtils;
31 import org.springframework.dao.OptimisticLockingFailureException;
32 import org.springframework.util.Assert;
33 import org.springframework.util.ReflectionUtils;
34
35
36
37
38 public class MapStepExecutionDao implements StepExecutionDao {
39
40 private Map<Long, Map<Long, StepExecution>> executionsByJobExecutionId = new ConcurrentHashMap<Long, Map<Long,StepExecution>>();
41
42 private Map<Long, StepExecution> executionsByStepExecutionId = new ConcurrentHashMap<Long, StepExecution>();
43
44 private AtomicLong currentId = new AtomicLong();
45
46 public void clear() {
47 executionsByJobExecutionId.clear();
48 executionsByStepExecutionId.clear();
49 }
50
51 private static StepExecution copy(StepExecution original) {
52 return (StepExecution) SerializationUtils.deserialize(SerializationUtils.serialize(original));
53 }
54
55 private static void copy(final StepExecution sourceExecution, final StepExecution targetExecution) {
56
57
58 ReflectionUtils.doWithFields(StepExecution.class, new ReflectionUtils.FieldCallback() {
59 @Override
60 public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
61 field.setAccessible(true);
62 field.set(targetExecution, field.get(sourceExecution));
63 }
64 });
65 }
66
67 @Override
68 public void saveStepExecution(StepExecution stepExecution) {
69
70 Assert.isTrue(stepExecution.getId() == null);
71 Assert.isTrue(stepExecution.getVersion() == null);
72 Assert.notNull(stepExecution.getJobExecutionId(), "JobExecution must be saved already.");
73
74 Map<Long, StepExecution> executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId());
75 if (executions == null) {
76 executions = new ConcurrentHashMap<Long, StepExecution>();
77 executionsByJobExecutionId.put(stepExecution.getJobExecutionId(), executions);
78 }
79
80 stepExecution.setId(currentId.incrementAndGet());
81 stepExecution.incrementVersion();
82 StepExecution copy = copy(stepExecution);
83 executions.put(stepExecution.getId(), copy);
84 executionsByStepExecutionId.put(stepExecution.getId(), copy);
85
86 }
87
88 @Override
89 public void updateStepExecution(StepExecution stepExecution) {
90
91 Assert.notNull(stepExecution.getJobExecutionId());
92
93 Map<Long, StepExecution> executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId());
94 Assert.notNull(executions, "step executions for given job execution are expected to be already saved");
95
96 final StepExecution persistedExecution = executionsByStepExecutionId.get(stepExecution.getId());
97 Assert.notNull(persistedExecution, "step execution is expected to be already saved");
98
99 synchronized (stepExecution) {
100 if (!persistedExecution.getVersion().equals(stepExecution.getVersion())) {
101 throw new OptimisticLockingFailureException("Attempt to update step execution id="
102 + stepExecution.getId() + " with wrong version (" + stepExecution.getVersion()
103 + "), where current version is " + persistedExecution.getVersion());
104 }
105
106 stepExecution.incrementVersion();
107 StepExecution copy = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
108 copy(stepExecution, copy);
109 executions.put(stepExecution.getId(), copy);
110 executionsByStepExecutionId.put(stepExecution.getId(), copy);
111 }
112 }
113
114 @Override
115 public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId) {
116 return executionsByStepExecutionId.get(stepExecutionId);
117 }
118
119 @Override
120 public void addStepExecutions(JobExecution jobExecution) {
121 Map<Long, StepExecution> executions = executionsByJobExecutionId.get(jobExecution.getId());
122 if (executions == null || executions.isEmpty()) {
123 return;
124 }
125 List<StepExecution> result = new ArrayList<StepExecution>(executions.values());
126 Collections.sort(result, new Comparator<Entity>() {
127
128 @Override
129 public int compare(Entity o1, Entity o2) {
130 return Long.signum(o2.getId() - o1.getId());
131 }
132 });
133
134 List<StepExecution> copy = new ArrayList<StepExecution>(result.size());
135 for (StepExecution exec : result) {
136 copy.add(copy(exec));
137 }
138 jobExecution.addStepExecutions(copy);
139 }
140 }