1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.item.database;
18
19 import java.sql.Connection;
20 import java.sql.PreparedStatement;
21 import java.sql.ResultSet;
22 import java.sql.SQLException;
23 import java.sql.SQLWarning;
24 import java.sql.Statement;
25
26 import javax.sql.DataSource;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.springframework.batch.item.ExecutionContext;
31 import org.springframework.batch.item.ItemStream;
32 import org.springframework.batch.item.ReaderNotOpenException;
33 import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
34 import org.springframework.beans.factory.InitializingBean;
35 import org.springframework.dao.InvalidDataAccessApiUsageException;
36 import org.springframework.dao.InvalidDataAccessResourceUsageException;
37 import org.springframework.jdbc.SQLWarningException;
38 import org.springframework.jdbc.datasource.DataSourceUtils;
39 import org.springframework.jdbc.support.JdbcUtils;
40 import org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator;
41 import org.springframework.jdbc.support.SQLExceptionTranslator;
42 import org.springframework.jdbc.support.SQLStateSQLExceptionTranslator;
43 import org.springframework.transaction.support.TransactionSynchronizationManager;
44 import org.springframework.util.Assert;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 public abstract class AbstractCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T>
107 implements InitializingBean {
108
109
110 protected final Log log = LogFactory.getLog(getClass());
111
112 public static final int VALUE_NOT_SET = -1;
113 private Connection con;
114
115 protected ResultSet rs;
116
117 private DataSource dataSource;
118
119 private int fetchSize = VALUE_NOT_SET;
120
121 private int maxRows = VALUE_NOT_SET;
122
123 private int queryTimeout = VALUE_NOT_SET;
124
125 private boolean ignoreWarnings = true;
126
127 private boolean verifyCursorPosition = true;
128
129 private SQLExceptionTranslator exceptionTranslator;
130
131 private boolean initialized = false;
132
133 private boolean driverSupportsAbsolute = false;
134
135 private boolean useSharedExtendedConnection = false;
136
137
138 public AbstractCursorItemReader() {
139 super();
140 }
141
142
143
144
145
146
147
148 @Override
149 public void afterPropertiesSet() throws Exception {
150 Assert.notNull(dataSource, "DataSource must be provided");
151 }
152
153
154
155
156
157
158 public void setDataSource(DataSource dataSource) {
159 this.dataSource = dataSource;
160 }
161
162
163
164
165
166
167 public DataSource getDataSource() {
168 return this.dataSource;
169 }
170
171
172
173
174
175
176
177
178
179
180
181 protected void applyStatementSettings(PreparedStatement stmt) throws SQLException {
182 if (fetchSize != VALUE_NOT_SET) {
183 stmt.setFetchSize(fetchSize);
184 stmt.setFetchDirection(ResultSet.FETCH_FORWARD);
185 }
186 if (maxRows != VALUE_NOT_SET) {
187 stmt.setMaxRows(maxRows);
188 }
189 if (queryTimeout != VALUE_NOT_SET) {
190 stmt.setQueryTimeout(queryTimeout);
191 }
192 }
193
194
195
196
197
198
199
200 protected SQLExceptionTranslator getExceptionTranslator() {
201 synchronized(this) {
202 if (exceptionTranslator == null) {
203 if (dataSource != null) {
204 exceptionTranslator = new SQLErrorCodeSQLExceptionTranslator(dataSource);
205 }
206 else {
207 exceptionTranslator = new SQLStateSQLExceptionTranslator();
208 }
209 }
210 }
211 return exceptionTranslator;
212 }
213
214
215
216
217
218
219
220
221
222
223 protected void handleWarnings(Statement statement) throws SQLWarningException,
224 SQLException {
225 if (ignoreWarnings) {
226 if (log.isDebugEnabled()) {
227 SQLWarning warningToLog = statement.getWarnings();
228 while (warningToLog != null) {
229 log.debug("SQLWarning ignored: SQL state '" + warningToLog.getSQLState() + "', error code '"
230 + warningToLog.getErrorCode() + "', message [" + warningToLog.getMessage() + "]");
231 warningToLog = warningToLog.getNextWarning();
232 }
233 }
234 }
235 else {
236 SQLWarning warnings = statement.getWarnings();
237 if (warnings != null) {
238 throw new SQLWarningException("Warning not ignored", warnings);
239 }
240 }
241 }
242
243
244
245
246
247
248 private void moveCursorToRow(int row) {
249 try {
250 int count = 0;
251 while (row != count && rs.next()) {
252 count++;
253 }
254 }
255 catch (SQLException se) {
256 throw getExceptionTranslator().translate("Attempted to move ResultSet to last committed row", getSql(), se);
257 }
258 }
259
260
261
262
263
264
265
266
267
268
269 public void setFetchSize(int fetchSize) {
270 this.fetchSize = fetchSize;
271 }
272
273
274
275
276
277
278
279
280 public void setMaxRows(int maxRows) {
281 this.maxRows = maxRows;
282 }
283
284
285
286
287
288
289
290
291
292
293 public void setQueryTimeout(int queryTimeout) {
294 this.queryTimeout = queryTimeout;
295 }
296
297
298
299
300
301
302
303 public void setIgnoreWarnings(boolean ignoreWarnings) {
304 this.ignoreWarnings = ignoreWarnings;
305 }
306
307
308
309
310
311
312
313 public void setVerifyCursorPosition(boolean verifyCursorPosition) {
314 this.verifyCursorPosition = verifyCursorPosition;
315 }
316
317
318
319
320
321
322
323
324
325
326
327
328 public void setDriverSupportsAbsolute(boolean driverSupportsAbsolute) {
329 this.driverSupportsAbsolute = driverSupportsAbsolute;
330 }
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348 public void setUseSharedExtendedConnection(boolean useSharedExtendedConnection) {
349 this.useSharedExtendedConnection = useSharedExtendedConnection;
350 }
351
352 public boolean isUseSharedExtendedConnection() {
353 return useSharedExtendedConnection;
354 }
355
356 public abstract String getSql();
357
358
359
360
361
362 private void verifyCursorPosition(long expectedCurrentRow) throws SQLException {
363 if (verifyCursorPosition) {
364 if (expectedCurrentRow != this.rs.getRow()) {
365 throw new InvalidDataAccessResourceUsageException("Unexpected cursor position change.");
366 }
367 }
368 }
369
370
371
372
373
374 @Override
375 protected void doClose() throws Exception {
376 initialized = false;
377 JdbcUtils.closeResultSet(this.rs);
378 rs = null;
379 cleanupOnClose();
380 if (useSharedExtendedConnection && dataSource instanceof ExtendedConnectionDataSourceProxy) {
381 ((ExtendedConnectionDataSourceProxy)dataSource).stopCloseSuppression(this.con);
382 if (!TransactionSynchronizationManager.isActualTransactionActive()) {
383 DataSourceUtils.releaseConnection(con, dataSource);
384 }
385 }
386 else {
387 JdbcUtils.closeConnection(this.con);
388 }
389 }
390
391 protected abstract void cleanupOnClose() throws Exception;
392
393
394
395
396 @Override
397 protected final void doOpen() throws Exception {
398
399 Assert.state(!initialized, "Stream is already initialized. Close before re-opening.");
400 Assert.isNull(rs, "ResultSet still open! Close before re-opening.");
401
402 initializeConnection();
403 openCursor(con);
404 initialized = true;
405
406 }
407
408 protected void initializeConnection() {
409 Assert.state(getDataSource() != null, "DataSource must not be null.");
410
411 try {
412 if (useSharedExtendedConnection) {
413 if (!(getDataSource() instanceof ExtendedConnectionDataSourceProxy)) {
414 throw new InvalidDataAccessApiUsageException(
415 "You must use a ExtendedConnectionDataSourceProxy for the dataSource when " +
416 "useSharedExtendedConnection is set to true.");
417 }
418 this.con = DataSourceUtils.getConnection(dataSource);
419 ((ExtendedConnectionDataSourceProxy)dataSource).startCloseSuppression(this.con);
420 }
421 else {
422 this.con = dataSource.getConnection();
423 }
424 }
425 catch (SQLException se) {
426 close();
427 throw getExceptionTranslator().translate("Executing query", getSql(), se);
428 }
429 }
430
431 protected abstract void openCursor(Connection con);
432
433
434
435
436
437 @Override
438 protected T doRead() throws Exception {
439 if (rs == null) {
440 throw new ReaderNotOpenException("Reader must be open before it can be read.");
441 }
442
443 try {
444 if (!rs.next()) {
445 return null;
446 }
447 int currentRow = getCurrentItemCount();
448 T item = readCursor(rs, currentRow);
449 verifyCursorPosition(currentRow);
450 return item;
451 }
452 catch (SQLException se) {
453 throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), se);
454 }
455 }
456
457
458
459
460
461
462
463
464
465
466 protected abstract T readCursor(ResultSet rs, int currentRow) throws SQLException;
467
468
469
470
471
472 @Override
473 protected void jumpToItem(int itemIndex) throws Exception {
474 if (driverSupportsAbsolute) {
475 try {
476 rs.absolute(itemIndex);
477 }
478 catch (SQLException e) {
479
480
481 log.warn("The JDBC driver does not appear to support ResultSet.absolute(). Consider"
482 + " reverting to the default behavior setting the driverSupportsAbsolute to false", e);
483
484 moveCursorToRow(itemIndex);
485 }
486 }
487 else {
488 moveCursorToRow(itemIndex);
489 }
490 }
491
492 }