1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.item.database;
18
19 import java.sql.ResultSet;
20 import java.sql.SQLException;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.LinkedHashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.SortedMap;
27 import java.util.TreeMap;
28 import java.util.concurrent.CopyOnWriteArrayList;
29
30 import javax.sql.DataSource;
31
32 import org.springframework.batch.item.ExecutionContext;
33 import org.springframework.batch.item.ItemStreamException;
34 import org.springframework.beans.factory.InitializingBean;
35 import org.springframework.jdbc.core.JdbcTemplate;
36 import org.springframework.jdbc.core.RowMapper;
37 import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
38 import org.springframework.util.Assert;
39 import org.springframework.util.ClassUtils;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 public class JdbcPagingItemReader<T> extends AbstractPagingItemReader<T> implements InitializingBean {
77 private static final String START_AFTER_VALUE = "start.after";
78
79 public static final int VALUE_NOT_SET = -1;
80
81 private DataSource dataSource;
82
83 private PagingQueryProvider queryProvider;
84
85 private Map<String, Object> parameterValues;
86
87 private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
88
89 @SuppressWarnings("rawtypes")
90 private RowMapper rowMapper;
91
92 private String firstPageSql;
93
94 private String remainingPagesSql;
95
96 private Map<String, Object> startAfterValues;
97
98 private int fetchSize = VALUE_NOT_SET;
99
100 public JdbcPagingItemReader() {
101 setName(ClassUtils.getShortName(JdbcPagingItemReader.class));
102 }
103
104 public void setDataSource(DataSource dataSource) {
105 this.dataSource = dataSource;
106 }
107
108
109
110
111
112
113
114
115
116
117 public void setFetchSize(int fetchSize) {
118 this.fetchSize = fetchSize;
119 }
120
121
122
123
124
125
126
127 public void setQueryProvider(PagingQueryProvider queryProvider) {
128 this.queryProvider = queryProvider;
129 }
130
131
132
133
134
135
136
137
138
139
140 @SuppressWarnings("rawtypes")
141 public void setRowMapper(RowMapper rowMapper) {
142 this.rowMapper = rowMapper;
143 }
144
145
146
147
148
149
150
151
152
153
154
155 public void setParameterValues(Map<String, Object> parameterValues) {
156 this.parameterValues = parameterValues;
157 }
158
159
160
161
162
163 @Override
164 public void afterPropertiesSet() throws Exception {
165 super.afterPropertiesSet();
166 Assert.notNull(dataSource);
167 JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
168 if (fetchSize != VALUE_NOT_SET) {
169 jdbcTemplate.setFetchSize(fetchSize);
170 }
171 jdbcTemplate.setMaxRows(getPageSize());
172 namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(jdbcTemplate);
173 Assert.notNull(queryProvider);
174 queryProvider.init(dataSource);
175 this.firstPageSql = queryProvider.generateFirstPageQuery(getPageSize());
176 this.remainingPagesSql = queryProvider.generateRemainingPagesQuery(getPageSize());
177 }
178
179 @Override
180 @SuppressWarnings("unchecked")
181 protected void doReadPage() {
182 if (results == null) {
183 results = new CopyOnWriteArrayList<T>();
184 }
185 else {
186 results.clear();
187 }
188
189 PagingRowMapper rowCallback = new PagingRowMapper();
190
191 List<?> query;
192
193 if (getPage() == 0) {
194 if (logger.isDebugEnabled()) {
195 logger.debug("SQL used for reading first page: [" + firstPageSql + "]");
196 }
197 if (parameterValues != null && parameterValues.size() > 0) {
198 if (this.queryProvider.isUsingNamedParameters()) {
199 query = namedParameterJdbcTemplate.query(firstPageSql,
200 getParameterMap(parameterValues, null), rowCallback);
201 }
202 else {
203 query = getJdbcTemplate().query(firstPageSql,
204 getParameterList(parameterValues, null).toArray(), rowCallback);
205 }
206 }
207 else {
208 query = getJdbcTemplate().query(firstPageSql, rowCallback);
209 }
210
211 }
212 else {
213 if (logger.isDebugEnabled()) {
214 logger.debug("SQL used for reading remaining pages: [" + remainingPagesSql + "]");
215 }
216 if (this.queryProvider.isUsingNamedParameters()) {
217 query = namedParameterJdbcTemplate.query(remainingPagesSql,
218 getParameterMap(parameterValues, startAfterValues), rowCallback);
219 }
220 else {
221 query = getJdbcTemplate().query(remainingPagesSql,
222 getParameterList(parameterValues, startAfterValues).toArray(), rowCallback);
223 }
224 }
225
226 Collection<T> result = (Collection<T>) query;
227 results.addAll(result);
228 }
229
230 @Override
231 public void update(ExecutionContext executionContext) throws ItemStreamException {
232 super.update(executionContext);
233 if (isSaveState() && startAfterValues != null) {
234 executionContext.put(getExecutionContextUserSupport().getKey(START_AFTER_VALUE), startAfterValues);
235 }
236 }
237
238 @Override
239 @SuppressWarnings("unchecked")
240 public void open(ExecutionContext executionContext) {
241 if (isSaveState()) {
242 startAfterValues = (Map<String, Object>) executionContext.get(getExecutionContextUserSupport().getKey(START_AFTER_VALUE));
243
244 if(startAfterValues == null) {
245 startAfterValues = new LinkedHashMap<String, Object>();
246 }
247 }
248
249 super.open(executionContext);
250 }
251
252 @Override
253 @SuppressWarnings({"unchecked", "rawtypes"})
254 protected void doJumpToPage(int itemIndex) {
255
256
257
258
259 if (startAfterValues == null && getPage() > 0) {
260
261 String jumpToItemSql;
262 jumpToItemSql = queryProvider.generateJumpToItemQuery(itemIndex, getPageSize());
263
264 if (logger.isDebugEnabled()) {
265 logger.debug("SQL used for jumping: [" + jumpToItemSql + "]");
266 }
267
268 RowMapper startMapper = new RowMapper() {
269 @Override
270 public Object mapRow(ResultSet rs, int i) throws SQLException {
271 return rs.getObject(1);
272 }
273 };
274 if (this.queryProvider.isUsingNamedParameters()) {
275 startAfterValues = (Map<String, Object>) namedParameterJdbcTemplate.queryForObject(jumpToItemSql,
276 getParameterMap(parameterValues, startAfterValues), startMapper);
277 }
278 else {
279 startAfterValues = (Map<String, Object>) getJdbcTemplate().queryForObject(jumpToItemSql,
280 getParameterList(parameterValues, startAfterValues).toArray(), startMapper);
281 }
282 }
283 }
284
285 private Map<String, Object> getParameterMap(Map<String, Object> values, Map<String, Object> sortKeyValues) {
286 Map<String, Object> parameterMap = new LinkedHashMap<String, Object>();
287 if (values != null) {
288 parameterMap.putAll(values);
289 }
290 if (sortKeyValues != null && !sortKeyValues.isEmpty()) {
291 for (Map.Entry<String, Object> sortKey : sortKeyValues.entrySet()) {
292 parameterMap.put("_" + sortKey.getKey(), sortKey.getValue());
293 }
294 }
295 if (logger.isDebugEnabled()) {
296 logger.debug("Using parameterMap:" + parameterMap);
297 }
298 return parameterMap;
299 }
300
301 private List<Object> getParameterList(Map<String, Object> values, Map<String, Object> sortKeyValue) {
302 SortedMap<String, Object> sm = new TreeMap<String, Object>();
303 if (values != null) {
304 sm.putAll(values);
305 }
306 List<Object> parameterList = new ArrayList<Object>();
307 parameterList.addAll(sm.values());
308 if (sortKeyValue != null && sortKeyValue.size() > 0) {
309 List<Map.Entry<String, Object>> keys = new ArrayList<Map.Entry<String,Object>>(sortKeyValue.entrySet());
310
311 for(int i = 0; i < keys.size(); i++) {
312 for(int j = 0; j < i; j++) {
313 parameterList.add(keys.get(j).getValue());
314 }
315
316 parameterList.add(keys.get(i).getValue());
317 }
318 }
319
320 if (logger.isDebugEnabled()) {
321 logger.debug("Using parameterList:" + parameterList);
322 }
323 return parameterList;
324 }
325
326 @SuppressWarnings("rawtypes")
327 private class PagingRowMapper implements RowMapper {
328 @Override
329 public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
330 startAfterValues = new LinkedHashMap<String, Object>();
331 for (Map.Entry<String, Order> sortKey : queryProvider.getSortKeys().entrySet()) {
332 startAfterValues.put(sortKey.getKey(), rs.getObject(sortKey.getKey()));
333 }
334
335 return rowMapper.mapRow(rs, rowNum);
336 }
337 }
338
339 private JdbcTemplate getJdbcTemplate() {
340 return (JdbcTemplate) namedParameterJdbcTemplate.getJdbcOperations();
341 }
342 }