1 /*
2 * Copyright 2006-2013 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.springframework.batch.item.database;
17
18 import java.util.Map;
19
20 import org.hibernate.ScrollableResults;
21 import org.hibernate.Session;
22 import org.hibernate.SessionFactory;
23 import org.hibernate.StatelessSession;
24 import org.springframework.batch.item.ExecutionContext;
25 import org.springframework.batch.item.ItemReader;
26 import org.springframework.batch.item.ItemStream;
27 import org.springframework.batch.item.ItemStreamException;
28 import org.springframework.batch.item.database.orm.HibernateQueryProvider;
29 import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
30 import org.springframework.beans.factory.InitializingBean;
31 import org.springframework.util.Assert;
32 import org.springframework.util.ClassUtils;
33
34 /**
35 * {@link ItemReader} for reading database records built on top of Hibernate. It
36 * executes the HQL query when initialized iterates over the result set as
37 * {@link #read()} method is called, returning an object corresponding to
38 * current row. The query can be set directly using
39 * {@link #setQueryString(String)}, a named query can be used by
40 * {@link #setQueryName(String)}, or a query provider strategy can be supplied
41 * via {@link #setQueryProvider(HibernateQueryProvider)}.
42 *
43 *
44 * <p>
45 * The reader can be configured to use either {@link StatelessSession}
46 * sufficient for simple mappings without the need to cascade to associated
47 * objects or standard hibernate {@link Session} for more advanced mappings or
48 * when caching is desired. When stateful session is used it will be cleared in
49 * the {@link #update(ExecutionContext)} method without being flushed (no data
50 * modifications are expected).
51 * </p>
52 *
53 * The implementation is <b>not</b> thread-safe.
54 *
55 * @author Robert Kasanicky
56 * @author Dave Syer
57 */
58 public class HibernateCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements ItemStream,
59 InitializingBean {
60
61 private HibernateItemReaderHelper<T> helper = new HibernateItemReaderHelper<T>();
62
63 public HibernateCursorItemReader() {
64 setName(ClassUtils.getShortName(HibernateCursorItemReader.class));
65 }
66
67 private ScrollableResults cursor;
68
69 private boolean initialized = false;
70
71 private int fetchSize;
72
73 private Map<String, Object> parameterValues;
74
75 @Override
76 public void afterPropertiesSet() throws Exception {
77 Assert.state(fetchSize >= 0, "fetchSize must not be negative");
78 helper.afterPropertiesSet();
79 }
80
81 /**
82 * The parameter values to apply to a query (map of name:value).
83 *
84 * @param parameterValues the parameter values to set
85 */
86 public void setParameterValues(Map<String, Object> parameterValues) {
87 this.parameterValues = parameterValues;
88 }
89
90 /**
91 * A query name for an externalized query. Either this or the {
92 * {@link #setQueryString(String) query string} or the {
93 * {@link #setQueryProvider(HibernateQueryProvider) query provider} should
94 * be set.
95 *
96 * @param queryName name of a hibernate named query
97 */
98 public void setQueryName(String queryName) {
99 helper.setQueryName(queryName);
100 }
101
102 /**
103 * Fetch size used internally by Hibernate to limit amount of data fetched
104 * from database per round trip.
105 *
106 * @param fetchSize the fetch size to pass down to Hibernate
107 */
108 public void setFetchSize(int fetchSize) {
109 this.fetchSize = fetchSize;
110 }
111
112 /**
113 * A query provider. Either this or the {{@link #setQueryString(String)
114 * query string} or the {{@link #setQueryName(String) query name} should be
115 * set.
116 *
117 * @param queryProvider Hibernate query provider
118 */
119 public void setQueryProvider(HibernateQueryProvider queryProvider) {
120 helper.setQueryProvider(queryProvider);
121 }
122
123 /**
124 * A query string in HQL. Either this or the {
125 * {@link #setQueryProvider(HibernateQueryProvider) query provider} or the {
126 * {@link #setQueryName(String) query name} should be set.
127 *
128 * @param queryString HQL query string
129 */
130 public void setQueryString(String queryString) {
131 helper.setQueryString(queryString);
132 }
133
134 /**
135 * The Hibernate SessionFactory to use the create a session.
136 *
137 * @param sessionFactory the {@link SessionFactory} to set
138 */
139 public void setSessionFactory(SessionFactory sessionFactory) {
140 helper.setSessionFactory(sessionFactory);
141 }
142
143 /**
144 * Can be set only in uninitialized state.
145 *
146 * @param useStatelessSession <code>true</code> to use
147 * {@link StatelessSession} <code>false</code> to use standard hibernate
148 * {@link Session}
149 */
150 public void setUseStatelessSession(boolean useStatelessSession) {
151 helper.setUseStatelessSession(useStatelessSession);
152 }
153
154 @Override
155 protected T doRead() throws Exception {
156 if (cursor.next()) {
157 Object[] data = cursor.get();
158
159 if (data.length > 1) {
160 // If there are multiple items this must be a projection
161 // and T is an array type.
162 @SuppressWarnings("unchecked")
163 T item = (T) data;
164 return item;
165 }
166 else {
167 // Assume if there is only one item that it is the data the user
168 // wants.
169 // If there is only one item this is going to be a nasty shock
170 // if T is an array type but there's not much else we can do...
171 @SuppressWarnings("unchecked")
172 T item = (T) data[0];
173 return item;
174 }
175
176 }
177 return null;
178 }
179
180 /**
181 * Open hibernate session and create a forward-only cursor for the query.
182 */
183 @Override
184 protected void doOpen() throws Exception {
185 Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first");
186 cursor = helper.getForwardOnlyCursor(fetchSize, parameterValues);
187 initialized = true;
188 }
189
190 /**
191 * Update the context and clear the session if stateful.
192 *
193 * @param executionContext the current {@link ExecutionContext}
194 * @throws ItemStreamException if there is a problem
195 */
196 @Override
197 public void update(ExecutionContext executionContext) throws ItemStreamException {
198 super.update(executionContext);
199 helper.clear();
200 }
201
202 /**
203 * Wind forward through the result set to the item requested. Also clears
204 * the session every now and then (if stateful) to avoid memory problems.
205 * The frequency of session clearing is the larger of the fetch size (if
206 * set) and 100.
207 *
208 * @param itemIndex the first item to read
209 * @throws Exception if there is a problem
210 * @see AbstractItemCountingItemStreamItemReader#jumpToItem(int)
211 */
212 @Override
213 protected void jumpToItem(int itemIndex) throws Exception {
214 int flushSize = Math.max(fetchSize, 100);
215 helper.jumpToItem(cursor, itemIndex, flushSize);
216 }
217
218 /**
219 * Close the cursor and hibernate session.
220 */
221 @Override
222 protected void doClose() throws Exception {
223
224 initialized = false;
225
226 if (cursor != null) {
227 cursor.close();
228 }
229
230 helper.close();
231
232 }
233 }