1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.core.repository.dao;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.IOException;
22 import java.sql.PreparedStatement;
23 import java.sql.ResultSet;
24 import java.sql.SQLException;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29
30 import org.springframework.batch.core.JobExecution;
31 import org.springframework.batch.core.StepExecution;
32 import org.springframework.batch.core.repository.ExecutionContextSerializer;
33 import org.springframework.batch.item.ExecutionContext;
34 import org.springframework.core.serializer.Serializer;
35 import org.springframework.jdbc.core.PreparedStatementSetter;
36 import org.springframework.jdbc.core.simple.ParameterizedRowMapper;
37 import org.springframework.jdbc.support.lob.DefaultLobHandler;
38 import org.springframework.jdbc.support.lob.LobHandler;
39 import org.springframework.util.Assert;
40
41
42
43
44
45
46
47
48
49
50
51
52 public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implements ExecutionContextDao {
53
54 private static final String FIND_JOB_EXECUTION_CONTEXT = "SELECT SHORT_CONTEXT, SERIALIZED_CONTEXT "
55 + "FROM %PREFIX%JOB_EXECUTION_CONTEXT WHERE JOB_EXECUTION_ID = ?";
56
57 private static final String INSERT_JOB_EXECUTION_CONTEXT = "INSERT INTO %PREFIX%JOB_EXECUTION_CONTEXT "
58 + "(SHORT_CONTEXT, SERIALIZED_CONTEXT, JOB_EXECUTION_ID) " + "VALUES(?, ?, ?)";
59
60 private static final String UPDATE_JOB_EXECUTION_CONTEXT = "UPDATE %PREFIX%JOB_EXECUTION_CONTEXT "
61 + "SET SHORT_CONTEXT = ?, SERIALIZED_CONTEXT = ? " + "WHERE JOB_EXECUTION_ID = ?";
62
63 private static final String FIND_STEP_EXECUTION_CONTEXT = "SELECT SHORT_CONTEXT, SERIALIZED_CONTEXT "
64 + "FROM %PREFIX%STEP_EXECUTION_CONTEXT WHERE STEP_EXECUTION_ID = ?";
65
66 private static final String INSERT_STEP_EXECUTION_CONTEXT = "INSERT INTO %PREFIX%STEP_EXECUTION_CONTEXT "
67 + "(SHORT_CONTEXT, SERIALIZED_CONTEXT, STEP_EXECUTION_ID) " + "VALUES(?, ?, ?)";
68
69 private static final String UPDATE_STEP_EXECUTION_CONTEXT = "UPDATE %PREFIX%STEP_EXECUTION_CONTEXT "
70 + "SET SHORT_CONTEXT = ?, SERIALIZED_CONTEXT = ? " + "WHERE STEP_EXECUTION_ID = ?";
71
72 private static final int DEFAULT_MAX_VARCHAR_LENGTH = 2500;
73
74 private int shortContextLength = DEFAULT_MAX_VARCHAR_LENGTH;
75
76 private LobHandler lobHandler = new DefaultLobHandler();
77
78 private ExecutionContextSerializer serializer;
79
80
81
82
83
84
85 public void setSerializer(ExecutionContextSerializer serializer) {
86 this.serializer = serializer;
87 }
88
89
90
91
92
93
94
95
96
97
98
99 public void setShortContextLength(int shortContextLength) {
100 this.shortContextLength = shortContextLength;
101 }
102
103 @Override
104 public ExecutionContext getExecutionContext(JobExecution jobExecution) {
105 Long executionId = jobExecution.getId();
106 Assert.notNull(executionId, "ExecutionId must not be null.");
107
108 List<ExecutionContext> results = getJdbcTemplate().query(getQuery(FIND_JOB_EXECUTION_CONTEXT),
109 new ExecutionContextRowMapper(), executionId);
110 if (results.size() > 0) {
111 return results.get(0);
112 }
113 else {
114 return new ExecutionContext();
115 }
116 }
117
118 @Override
119 public ExecutionContext getExecutionContext(StepExecution stepExecution) {
120 Long executionId = stepExecution.getId();
121 Assert.notNull(executionId, "ExecutionId must not be null.");
122
123 List<ExecutionContext> results = getJdbcTemplate().query(getQuery(FIND_STEP_EXECUTION_CONTEXT),
124 new ExecutionContextRowMapper(), executionId);
125 if (results.size() > 0) {
126 return results.get(0);
127 }
128 else {
129 return new ExecutionContext();
130 }
131 }
132
133 @Override
134 public void updateExecutionContext(final JobExecution jobExecution) {
135 Long executionId = jobExecution.getId();
136 ExecutionContext executionContext = jobExecution.getExecutionContext();
137 Assert.notNull(executionId, "ExecutionId must not be null.");
138 Assert.notNull(executionContext, "The ExecutionContext must not be null.");
139
140 String serializedContext = serializeContext(executionContext);
141
142 persistSerializedContext(executionId, serializedContext, UPDATE_JOB_EXECUTION_CONTEXT);
143 }
144
145 @Override
146 public void updateExecutionContext(final StepExecution stepExecution) {
147
148 Long executionId = stepExecution.getId();
149 ExecutionContext executionContext = stepExecution.getExecutionContext();
150 Assert.notNull(executionId, "ExecutionId must not be null.");
151 Assert.notNull(executionContext, "The ExecutionContext must not be null.");
152
153 String serializedContext = serializeContext(executionContext);
154
155 persistSerializedContext(executionId, serializedContext, UPDATE_STEP_EXECUTION_CONTEXT);
156 }
157
158 @Override
159 public void saveExecutionContext(JobExecution jobExecution) {
160
161 Long executionId = jobExecution.getId();
162 ExecutionContext executionContext = jobExecution.getExecutionContext();
163 Assert.notNull(executionId, "ExecutionId must not be null.");
164 Assert.notNull(executionContext, "The ExecutionContext must not be null.");
165
166 String serializedContext = serializeContext(executionContext);
167
168 persistSerializedContext(executionId, serializedContext, INSERT_JOB_EXECUTION_CONTEXT);
169 }
170
171 @Override
172 public void saveExecutionContext(StepExecution stepExecution) {
173 Long executionId = stepExecution.getId();
174 ExecutionContext executionContext = stepExecution.getExecutionContext();
175 Assert.notNull(executionId, "ExecutionId must not be null.");
176 Assert.notNull(executionContext, "The ExecutionContext must not be null.");
177
178 String serializedContext = serializeContext(executionContext);
179
180 persistSerializedContext(executionId, serializedContext, INSERT_STEP_EXECUTION_CONTEXT);
181 }
182
183 public void setLobHandler(LobHandler lobHandler) {
184 this.lobHandler = lobHandler;
185 }
186
187 @Override
188 public void afterPropertiesSet() throws Exception {
189 super.afterPropertiesSet();
190 }
191
192
193
194
195
196
197 private void persistSerializedContext(final Long executionId, String serializedContext, String sql) {
198
199 final String shortContext;
200 final String longContext;
201 if (serializedContext.length() > shortContextLength) {
202
203
204 shortContext = serializedContext.substring(0, shortContextLength - 8) + " ...";
205 longContext = serializedContext;
206 }
207 else {
208 shortContext = serializedContext;
209 longContext = null;
210 }
211
212 getJdbcTemplate().update(getQuery(sql), new PreparedStatementSetter() {
213 @Override
214 public void setValues(PreparedStatement ps) throws SQLException {
215 ps.setString(1, shortContext);
216 if (longContext != null) {
217 lobHandler.getLobCreator().setClobAsString(ps, 2, longContext);
218 }
219 else {
220 ps.setNull(2, getClobTypeToUse());
221 }
222 ps.setLong(3, executionId);
223 }
224 });
225 }
226
227 @SuppressWarnings("unchecked")
228 private String serializeContext(ExecutionContext ctx) {
229 Map<String, Object> m = new HashMap<String, Object>();
230 for (Entry<String, Object> me : ctx.entrySet()) {
231 m.put(me.getKey(), me.getValue());
232 }
233
234 ByteArrayOutputStream out = new ByteArrayOutputStream();
235 String results = "";
236
237 try {
238 serializer.serialize(m, out);
239 results = new String(out.toByteArray(), "ISO-8859-1");
240 }
241 catch (IOException ioe) {
242 throw new IllegalArgumentException("Could not serialize the execution context", ioe);
243 }
244
245 return results;
246 }
247
248 @SuppressWarnings("unchecked")
249 private class ExecutionContextRowMapper implements ParameterizedRowMapper<ExecutionContext> {
250
251 @Override
252 public ExecutionContext mapRow(ResultSet rs, int i) throws SQLException {
253 ExecutionContext executionContext = new ExecutionContext();
254 String serializedContext = rs.getString("SERIALIZED_CONTEXT");
255 if (serializedContext == null) {
256 serializedContext = rs.getString("SHORT_CONTEXT");
257 }
258
259 Map<String, Object> map;
260 try {
261 ByteArrayInputStream in = new ByteArrayInputStream(serializedContext.getBytes("ISO-8859-1"));
262 map = (Map<String, Object>) serializer.deserialize(in);
263 }
264 catch (IOException ioe) {
265 throw new IllegalArgumentException("Unable to deserialize the execution context", ioe);
266 }
267 for (Map.Entry<String, Object> entry : map.entrySet()) {
268 executionContext.put(entry.getKey(), entry.getValue());
269 }
270 return executionContext;
271 }
272 }
273
274 }