1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.item.database;
18
19 import java.sql.CallableStatement;
20 import java.sql.Connection;
21 import java.sql.ResultSet;
22 import java.sql.SQLException;
23 import java.sql.Types;
24 import java.util.Arrays;
25
26 import org.springframework.jdbc.core.PreparedStatementSetter;
27 import org.springframework.jdbc.core.RowMapper;
28 import org.springframework.jdbc.core.SqlOutParameter;
29 import org.springframework.jdbc.core.SqlParameter;
30 import org.springframework.jdbc.core.metadata.CallMetaDataContext;
31 import org.springframework.jdbc.support.JdbcUtils;
32 import org.springframework.util.Assert;
33 import org.springframework.util.ClassUtils;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 @SuppressWarnings("rawtypes")
59 public class StoredProcedureItemReader<T> extends AbstractCursorItemReader<T> {
60
61 private CallableStatement callableStatement;
62
63 private PreparedStatementSetter preparedStatementSetter;
64
65 private String procedureName;
66
67 private String callString;
68
69 private RowMapper rowMapper;
70
71 private SqlParameter[] parameters = new SqlParameter[0];
72
73 private boolean function = false;
74
75 private int refCursorPosition = 0;
76
77 public StoredProcedureItemReader() {
78 super();
79 setName(ClassUtils.getShortName(StoredProcedureItemReader.class));
80 }
81
82
83
84
85
86
87 public void setRowMapper(RowMapper rowMapper) {
88 this.rowMapper = rowMapper;
89 }
90
91
92
93
94
95
96
97
98 public void setProcedureName(String sprocedureName) {
99 this.procedureName = sprocedureName;
100 }
101
102
103
104
105
106
107
108 public void setPreparedStatementSetter(PreparedStatementSetter preparedStatementSetter) {
109 this.preparedStatementSetter = preparedStatementSetter;
110 }
111
112
113
114
115
116
117
118 public void setParameters(SqlParameter[] parameters) {
119 this.parameters = parameters;
120 }
121
122
123
124
125 public void setFunction(boolean function) {
126 this.function = function;
127 }
128
129
130
131
132
133
134
135
136 public void setRefCursorPosition(int refCursorPosition) {
137 this.refCursorPosition = refCursorPosition;
138 }
139
140
141
142
143
144
145
146 @Override
147 public void afterPropertiesSet() throws Exception {
148 super.afterPropertiesSet();
149 Assert.notNull(procedureName, "The name of the stored procedure must be provided");
150 Assert.notNull(rowMapper, "RowMapper must be provided");
151 }
152
153 @Override
154 protected void openCursor(Connection con) {
155
156 Assert.state(procedureName != null, "Procedure Name must not be null.");
157 Assert.state(refCursorPosition >= 0,
158 "invalid refCursorPosition specified as " + refCursorPosition + "; it can't be " +
159 "specified as a negative number.");
160 Assert.state(refCursorPosition == 0 || refCursorPosition > 0,
161 "invalid refCursorPosition specified as " + refCursorPosition + "; there are " +
162 parameters.length + " parameters defined.");
163
164 CallMetaDataContext callContext = new CallMetaDataContext();
165 callContext.setAccessCallParameterMetaData(false);
166 callContext.setProcedureName(procedureName);
167 callContext.setFunction(function);
168 callContext.initializeMetaData(getDataSource());
169 callContext.processParameters(Arrays.asList(parameters));
170 SqlParameter cursorParameter = callContext.createReturnResultSetParameter("cursor", rowMapper);
171 this.callString = callContext.createCallString();
172
173 log.debug("Call string is: " + callString);
174
175 int cursorSqlType = Types.OTHER;
176 if (function) {
177 if (cursorParameter instanceof SqlOutParameter) {
178 cursorSqlType = cursorParameter.getSqlType();
179 }
180 }
181 else {
182 if (refCursorPosition > 0 && refCursorPosition <= parameters.length) {
183 cursorSqlType = parameters[refCursorPosition - 1].getSqlType();
184 }
185 }
186
187 try {
188 if (isUseSharedExtendedConnection()) {
189 callableStatement = con.prepareCall(callString, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
190 ResultSet.HOLD_CURSORS_OVER_COMMIT);
191 }
192 else {
193 callableStatement = con.prepareCall(callString, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
194 }
195 applyStatementSettings(callableStatement);
196 if (this.preparedStatementSetter != null) {
197 preparedStatementSetter.setValues(callableStatement);
198 }
199
200 if (function) {
201 callableStatement.registerOutParameter(1, cursorSqlType);
202 }
203 else {
204 if (refCursorPosition > 0) {
205 callableStatement.registerOutParameter(refCursorPosition, cursorSqlType);
206 }
207 }
208 boolean results = callableStatement.execute();
209 if (results) {
210 rs = callableStatement.getResultSet();
211 }
212 else {
213 if (function) {
214 rs = (ResultSet) callableStatement.getObject(1);
215 }
216 else {
217 rs = (ResultSet) callableStatement.getObject(refCursorPosition);
218 }
219 }
220 handleWarnings(callableStatement);
221 }
222 catch (SQLException se) {
223 close();
224 throw getExceptionTranslator().translate("Executing stored procedure", getSql(), se);
225 }
226
227 }
228
229 @Override
230 @SuppressWarnings("unchecked")
231 protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
232 return (T) rowMapper.mapRow(rs, currentRow);
233 }
234
235
236
237
238 @Override
239 protected void cleanupOnClose() throws Exception {
240 JdbcUtils.closeStatement(this.callableStatement);
241 }
242
243 @Override
244 public String getSql() {
245 if (callString != null) {
246 return this.callString;
247 }
248 else {
249 return "PROCEDURE NAME: " + procedureName;
250 }
251 }
252
253 }