1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.item.file;
18
19 import java.io.BufferedReader;
20 import java.io.IOException;
21 import java.nio.charset.Charset;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.springframework.batch.item.ItemReader;
26 import org.springframework.batch.item.ReaderNotOpenException;
27 import org.springframework.batch.item.file.separator.RecordSeparatorPolicy;
28 import org.springframework.batch.item.file.separator.SimpleRecordSeparatorPolicy;
29 import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
30 import org.springframework.beans.factory.InitializingBean;
31 import org.springframework.core.io.Resource;
32 import org.springframework.util.Assert;
33 import org.springframework.util.ClassUtils;
34 import org.springframework.util.StringUtils;
35
36
37
38
39
40
41
42
43
44 public class FlatFileItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements
45 ResourceAwareItemReaderItemStream<T>, InitializingBean {
46
47 private static final Log logger = LogFactory.getLog(FlatFileItemReader.class);
48
49
50 public static final String DEFAULT_CHARSET = Charset.defaultCharset().name();
51
52 private RecordSeparatorPolicy recordSeparatorPolicy = new SimpleRecordSeparatorPolicy();
53
54 private Resource resource;
55
56 private BufferedReader reader;
57
58 private int lineCount = 0;
59
60 private String[] comments = new String[] { "#" };
61
62 private boolean noInput = false;
63
64 private String encoding = DEFAULT_CHARSET;
65
66 private LineMapper<T> lineMapper;
67
68 private int linesToSkip = 0;
69
70 private LineCallbackHandler skippedLinesCallback;
71
72 private boolean strict = true;
73
74 private BufferedReaderFactory bufferedReaderFactory = new DefaultBufferedReaderFactory();
75
76 public FlatFileItemReader() {
77 setName(ClassUtils.getShortName(FlatFileItemReader.class));
78 }
79
80
81
82
83
84
85 public void setStrict(boolean strict) {
86 this.strict = strict;
87 }
88
89
90
91
92 public void setSkippedLinesCallback(LineCallbackHandler skippedLinesCallback) {
93 this.skippedLinesCallback = skippedLinesCallback;
94 }
95
96
97
98
99
100
101
102 public void setLinesToSkip(int linesToSkip) {
103 this.linesToSkip = linesToSkip;
104 }
105
106
107
108
109
110 public void setLineMapper(LineMapper<T> lineMapper) {
111 this.lineMapper = lineMapper;
112 }
113
114
115
116
117
118
119 public void setEncoding(String encoding) {
120 this.encoding = encoding;
121 }
122
123
124
125
126
127
128
129
130 public void setBufferedReaderFactory(BufferedReaderFactory bufferedReaderFactory) {
131 this.bufferedReaderFactory = bufferedReaderFactory;
132 }
133
134
135
136
137
138
139
140 public void setComments(String[] comments) {
141 this.comments = new String[comments.length];
142 System.arraycopy(comments, 0, this.comments, 0, comments.length);
143 }
144
145
146
147
148 @Override
149 public void setResource(Resource resource) {
150 this.resource = resource;
151 }
152
153
154
155
156
157
158
159 public void setRecordSeparatorPolicy(RecordSeparatorPolicy recordSeparatorPolicy) {
160 this.recordSeparatorPolicy = recordSeparatorPolicy;
161 }
162
163
164
165
166
167 @Override
168 protected T doRead() throws Exception {
169 if (noInput) {
170 return null;
171 }
172
173 String line = readLine();
174
175 if (line == null) {
176 return null;
177 }
178 else {
179 try {
180 return lineMapper.mapLine(line, lineCount);
181 }
182 catch (Exception ex) {
183 throw new FlatFileParseException("Parsing error at line: " + lineCount + " in resource=["
184 + resource.getDescription() + "], input=[" + line + "]", ex, line, lineCount);
185 }
186 }
187 }
188
189
190
191
192 private String readLine() {
193
194 if (reader == null) {
195 throw new ReaderNotOpenException("Reader must be open before it can be read.");
196 }
197
198 String line = null;
199
200 try {
201 line = this.reader.readLine();
202 if (line == null) {
203 return null;
204 }
205 lineCount++;
206 while (isComment(line)) {
207 line = reader.readLine();
208 if (line == null) {
209 return null;
210 }
211 lineCount++;
212 }
213
214 line = applyRecordSeparatorPolicy(line);
215 }
216 catch (IOException e) {
217
218
219 noInput = true;
220 throw new NonTransientFlatFileException("Unable to read from resource: [" + resource + "]", e, line,
221 lineCount);
222 }
223 return line;
224 }
225
226 private boolean isComment(String line) {
227 for (String prefix : comments) {
228 if (line.startsWith(prefix)) {
229 return true;
230 }
231 }
232 return false;
233 }
234
235 @Override
236 protected void doClose() throws Exception {
237 lineCount = 0;
238 if (reader != null) {
239 reader.close();
240 }
241 }
242
243 @Override
244 protected void doOpen() throws Exception {
245 Assert.notNull(resource, "Input resource must be set");
246 Assert.notNull(recordSeparatorPolicy, "RecordSeparatorPolicy must be set");
247
248 noInput = true;
249 if (!resource.exists()) {
250 if (strict) {
251 throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode): " + resource);
252 }
253 logger.warn("Input resource does not exist " + resource.getDescription());
254 return;
255 }
256
257 if (!resource.isReadable()) {
258 if (strict) {
259 throw new IllegalStateException("Input resource must be readable (reader is in 'strict' mode): "
260 + resource);
261 }
262 logger.warn("Input resource is not readable " + resource.getDescription());
263 return;
264 }
265
266 reader = bufferedReaderFactory.create(resource, encoding);
267 for (int i = 0; i < linesToSkip; i++) {
268 String line = readLine();
269 if (skippedLinesCallback != null) {
270 skippedLinesCallback.handleLine(line);
271 }
272 }
273 noInput = false;
274 }
275
276 @Override
277 public void afterPropertiesSet() throws Exception {
278 Assert.notNull(lineMapper, "LineMapper is required");
279 }
280
281 @Override
282 protected void jumpToItem(int itemIndex) throws Exception {
283 for (int i = 0; i < itemIndex; i++) {
284 readLine();
285 }
286 }
287
288 private String applyRecordSeparatorPolicy(String line) throws IOException {
289
290 String record = line;
291 while (line != null && !recordSeparatorPolicy.isEndOfRecord(record)) {
292 line = this.reader.readLine();
293 if (line == null) {
294 if (StringUtils.hasText(record)) {
295
296
297 throw new FlatFileParseException("Unexpected end of file before record complete", record, lineCount);
298 }
299 else {
300
301
302
303 break;
304 }
305 }
306 else {
307 lineCount++;
308 }
309 record = recordSeparatorPolicy.preProcess(record) + line;
310 }
311
312 return recordSeparatorPolicy.postProcess(record);
313
314 }
315
316 }