1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.sample.common;
18
19 import java.io.Serializable;
20 import java.sql.PreparedStatement;
21 import java.sql.SQLException;
22 import java.util.List;
23 import java.util.ListIterator;
24
25 import org.springframework.batch.core.ExitStatus;
26 import org.springframework.batch.core.StepExecution;
27 import org.springframework.batch.core.StepExecutionListener;
28 import org.springframework.batch.item.ItemWriter;
29 import org.springframework.batch.support.SerializationUtils;
30 import org.springframework.jdbc.core.BatchPreparedStatementSetter;
31 import org.springframework.jdbc.core.support.JdbcDaoSupport;
32 import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
33 import org.springframework.util.Assert;
34 import org.springframework.util.ClassUtils;
35
36
37
38
39 public class StagingItemWriter<T> extends JdbcDaoSupport implements StepExecutionListener, ItemWriter<T> {
40
41 public static final String NEW = "N";
42
43 public static final String DONE = "Y";
44
45 public static final Object WORKING = "W";
46
47 private DataFieldMaxValueIncrementer incrementer;
48
49 private StepExecution stepExecution;
50
51
52
53
54
55
56 protected void initDao() throws Exception {
57 super.initDao();
58 Assert.notNull(incrementer, "DataFieldMaxValueIncrementer is required - set the incrementer property in the "
59 + ClassUtils.getShortName(StagingItemWriter.class));
60 }
61
62
63
64
65
66
67 public void setIncrementer(DataFieldMaxValueIncrementer incrementer) {
68 this.incrementer = incrementer;
69 }
70
71
72
73
74
75
76 public void write(final List<? extends T> items) {
77
78 final ListIterator<? extends T> itemIterator = items.listIterator();
79 getJdbcTemplate().batchUpdate("INSERT into BATCH_STAGING (ID, JOB_ID, VALUE, PROCESSED) values (?,?,?,?)",
80 new BatchPreparedStatementSetter() {
81
82 public int getBatchSize() {
83 return items.size();
84 }
85
86 public void setValues(PreparedStatement ps, int i) throws SQLException {
87
88 long id = incrementer.nextLongValue();
89 long jobId = stepExecution.getJobExecution().getJobId();
90
91 Assert.state(itemIterator.nextIndex() == i,
92 "Item ordering must be preserved in batch sql update");
93
94 byte[] blob = SerializationUtils.serialize((Serializable) itemIterator.next());
95
96 ps.setLong(1, id);
97 ps.setLong(2, jobId);
98 ps.setBytes(3, blob);
99 ps.setString(4, NEW);
100 }
101 });
102
103 }
104
105
106
107
108
109
110
111
112 public ExitStatus afterStep(StepExecution stepExecution) {
113 return null;
114 }
115
116
117
118
119
120
121
122 public void beforeStep(StepExecution stepExecution) {
123 this.stepExecution = stepExecution;
124 }
125
126 }