1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.item.support;
18
19 import org.springframework.batch.item.ExecutionContext;
20 import org.springframework.batch.item.ItemReader;
21 import org.springframework.batch.item.ItemStreamException;
22 import org.springframework.batch.item.ItemStreamReader;
23 import org.springframework.batch.item.ParseException;
24 import org.springframework.batch.item.UnexpectedInputException;
25 import org.springframework.batch.item.util.ExecutionContextUserSupport;
26 import org.springframework.util.Assert;
27
28
29
30
31
32
33
34
35
36
37 public abstract class AbstractItemCountingItemStreamItemReader<T> implements ItemStreamReader<T> {
38
39 private static final String READ_COUNT = "read.count";
40
41 private static final String READ_COUNT_MAX = "read.count.max";
42
43 private int currentItemCount = 0;
44
45 private int maxItemCount = Integer.MAX_VALUE;
46
47 private ExecutionContextUserSupport ecSupport = new ExecutionContextUserSupport();
48
49 private boolean saveState = true;
50
51
52
53
54
55
56
57 protected abstract T doRead() throws Exception;
58
59
60
61
62 protected abstract void doOpen() throws Exception;
63
64
65
66
67 protected abstract void doClose() throws Exception;
68
69
70
71
72
73
74 protected void jumpToItem(int itemIndex) throws Exception {
75 for (int i = 0; i < itemIndex; i++) {
76 read();
77 }
78 }
79
80 @Override
81 public final T read() throws Exception, UnexpectedInputException, ParseException {
82 if (currentItemCount >= maxItemCount) {
83 return null;
84 }
85 currentItemCount++;
86 return doRead();
87 }
88
89 protected int getCurrentItemCount() {
90 return currentItemCount;
91 }
92
93
94
95
96
97
98
99
100
101
102
103 public void setCurrentItemCount(int count) {
104 this.currentItemCount = count;
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118 public void setMaxItemCount(int count) {
119 this.maxItemCount = count;
120 }
121
122 @Override
123 public void close() throws ItemStreamException {
124 currentItemCount = 0;
125 try {
126 doClose();
127 }
128 catch (Exception e) {
129 throw new ItemStreamException("Error while closing item reader", e);
130 }
131 }
132
133 @Override
134 public void open(ExecutionContext executionContext) throws ItemStreamException {
135
136 try {
137 doOpen();
138 }
139 catch (Exception e) {
140 throw new ItemStreamException("Failed to initialize the reader", e);
141 }
142 if (!isSaveState()) {
143 return;
144 }
145
146 if (executionContext.containsKey(ecSupport.getKey(READ_COUNT_MAX))) {
147 maxItemCount = executionContext.getInt(ecSupport.getKey(READ_COUNT_MAX));
148 }
149
150 if (executionContext.containsKey(ecSupport.getKey(READ_COUNT))) {
151 int itemCount = executionContext.getInt(ecSupport.getKey(READ_COUNT));
152
153 if (itemCount < maxItemCount) {
154 try {
155 jumpToItem(itemCount);
156 }
157 catch (Exception e) {
158 throw new ItemStreamException("Could not move to stored position on restart", e);
159 }
160 }
161 currentItemCount = itemCount;
162
163 }
164
165 }
166
167 @Override
168 public void update(ExecutionContext executionContext) throws ItemStreamException {
169 if (saveState) {
170 Assert.notNull(executionContext, "ExecutionContext must not be null");
171 executionContext.putInt(ecSupport.getKey(READ_COUNT), currentItemCount);
172 if (maxItemCount < Integer.MAX_VALUE) {
173 executionContext.putInt(ecSupport.getKey(READ_COUNT_MAX), maxItemCount);
174 }
175 }
176
177 }
178
179 protected ExecutionContextUserSupport getExecutionContextUserSupport() {
180 return ecSupport;
181 }
182
183
184
185
186
187
188
189
190 public void setName(String name) {
191 ecSupport.setName(name);
192 }
193
194
195
196
197
198
199
200
201
202
203 public void setSaveState(boolean saveState) {
204 this.saveState = saveState;
205 }
206
207
208
209
210
211 public boolean isSaveState() {
212 return saveState;
213 }
214
215 }