1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.sample.common;
18
19 import java.sql.ResultSet;
20 import java.sql.SQLException;
21 import java.util.Iterator;
22 import java.util.List;
23
24 import javax.sql.DataSource;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.springframework.batch.core.ExitStatus;
29 import org.springframework.batch.core.StepExecution;
30 import org.springframework.batch.core.StepExecutionListener;
31 import org.springframework.batch.item.ItemReader;
32 import org.springframework.batch.item.ReaderNotOpenException;
33 import org.springframework.batch.support.SerializationUtils;
34 import org.springframework.beans.factory.DisposableBean;
35 import org.springframework.beans.factory.InitializingBean;
36 import org.springframework.dao.DataAccessException;
37 import org.springframework.jdbc.core.JdbcOperations;
38 import org.springframework.jdbc.core.JdbcTemplate;
39 import org.springframework.jdbc.core.simple.ParameterizedRowMapper;
40 import org.springframework.util.Assert;
41
42
43
44
45
46
47
48 public class StagingItemReader<T> implements ItemReader<ProcessIndicatorItemWrapper<T>>, StepExecutionListener,
49 InitializingBean, DisposableBean {
50
51 private static Log logger = LogFactory.getLog(StagingItemReader.class);
52
53 private StepExecution stepExecution;
54
55 private final Object lock = new Object();
56
57 private volatile boolean initialized = false;
58
59 private volatile Iterator<Long> keys;
60
61 private JdbcOperations jdbcTemplate;
62
63 public void setDataSource(DataSource dataSource) {
64 jdbcTemplate = new JdbcTemplate(dataSource);
65 }
66
67 public void destroy() throws Exception {
68 initialized = false;
69 keys = null;
70 }
71
72 public final void afterPropertiesSet() throws Exception {
73 Assert.notNull(jdbcTemplate, "You must provide a DataSource.");
74 }
75
76 private List<Long> retrieveKeys() {
77
78 synchronized (lock) {
79
80 return jdbcTemplate.query(
81
82 "SELECT ID FROM BATCH_STAGING WHERE JOB_ID=? AND PROCESSED=? ORDER BY ID",
83
84 new ParameterizedRowMapper<Long>() {
85 public Long mapRow(ResultSet rs, int rowNum) throws SQLException {
86 return rs.getLong(1);
87 }
88 },
89
90 stepExecution.getJobExecution().getJobId(), StagingItemWriter.NEW);
91
92 }
93
94 }
95
96 public ProcessIndicatorItemWrapper<T> read() throws DataAccessException {
97
98 if (!initialized) {
99 throw new ReaderNotOpenException("Reader must be open before it can be used.");
100 }
101
102 Long id = null;
103 synchronized (lock) {
104 if (keys.hasNext()) {
105 id = keys.next();
106 }
107 }
108 logger.debug("Retrieved key from list: " + id);
109
110 if (id == null) {
111 return null;
112 }
113 @SuppressWarnings("unchecked")
114 T result = (T) jdbcTemplate.queryForObject("SELECT VALUE FROM BATCH_STAGING WHERE ID=?",
115 new ParameterizedRowMapper<Object>() {
116 public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
117 byte[] blob = rs.getBytes(1);
118 return SerializationUtils.deserialize(blob);
119 }
120 }, id);
121
122 return new ProcessIndicatorItemWrapper<T>(id, result);
123
124 }
125
126 public ExitStatus afterStep(StepExecution stepExecution) {
127 return null;
128 }
129
130 public void beforeStep(StepExecution stepExecution) {
131 this.stepExecution = stepExecution;
132 synchronized (lock) {
133 if (keys == null) {
134 keys = retrieveKeys().iterator();
135 logger.info("Keys obtained for staging.");
136 initialized = true;
137 }
138 }
139 }
140
141 }