1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.item.file;
18
19 import java.io.BufferedWriter;
20 import java.io.File;
21 import java.io.FileOutputStream;
22 import java.io.IOException;
23 import java.io.Writer;
24 import java.nio.channels.Channels;
25 import java.nio.channels.FileChannel;
26 import java.nio.charset.UnsupportedCharsetException;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.springframework.batch.item.ExecutionContext;
32 import org.springframework.batch.item.ItemStream;
33 import org.springframework.batch.item.ItemStreamException;
34 import org.springframework.batch.item.WriteFailedException;
35 import org.springframework.batch.item.WriterNotOpenException;
36 import org.springframework.batch.item.file.transform.LineAggregator;
37 import org.springframework.batch.item.util.ExecutionContextUserSupport;
38 import org.springframework.batch.item.util.FileUtils;
39 import org.springframework.batch.support.transaction.TransactionAwareBufferedWriter;
40 import org.springframework.beans.factory.InitializingBean;
41 import org.springframework.core.io.Resource;
42 import org.springframework.util.Assert;
43 import org.springframework.util.ClassUtils;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public class FlatFileItemWriter<T> extends ExecutionContextUserSupport implements ResourceAwareItemWriterItemStream<T>,
61 InitializingBean {
62
63 private static final boolean DEFAULT_TRANSACTIONAL = true;
64
65 protected static final Log logger = LogFactory.getLog(FlatFileItemWriter.class);
66
67 private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
68
69 private static final String WRITTEN_STATISTICS_NAME = "written";
70
71 private static final String RESTART_DATA_NAME = "current.count";
72
73 private Resource resource;
74
75 private OutputState state = null;
76
77 private LineAggregator<T> lineAggregator;
78
79 private boolean saveState = true;
80
81 private boolean forceSync = false;
82
83 private boolean shouldDeleteIfExists = true;
84
85 private boolean shouldDeleteIfEmpty = false;
86
87 private String encoding = OutputState.DEFAULT_CHARSET;
88
89 private FlatFileHeaderCallback headerCallback;
90
91 private FlatFileFooterCallback footerCallback;
92
93 private String lineSeparator = DEFAULT_LINE_SEPARATOR;
94
95 private boolean transactional = DEFAULT_TRANSACTIONAL;
96
97 private boolean append = false;
98
99 public FlatFileItemWriter() {
100 setName(ClassUtils.getShortName(FlatFileItemWriter.class));
101 }
102
103
104
105
106
107
108 @Override
109 public void afterPropertiesSet() throws Exception {
110 Assert.notNull(lineAggregator, "A LineAggregator must be provided.");
111 if (append) {
112 shouldDeleteIfExists = false;
113 }
114 }
115
116
117
118
119
120
121
122
123
124
125 public void setForceSync(boolean forceSync) {
126 this.forceSync = forceSync;
127 }
128
129
130
131
132
133
134 public void setLineSeparator(String lineSeparator) {
135 this.lineSeparator = lineSeparator;
136 }
137
138
139
140
141
142
143
144 public void setLineAggregator(LineAggregator<T> lineAggregator) {
145 this.lineAggregator = lineAggregator;
146 }
147
148
149
150
151
152
153 @Override
154 public void setResource(Resource resource) {
155 this.resource = resource;
156 }
157
158
159
160
161 public void setEncoding(String newEncoding) {
162 this.encoding = newEncoding;
163 }
164
165
166
167
168
169
170
171
172
173
174 public void setShouldDeleteIfExists(boolean shouldDeleteIfExists) {
175 this.shouldDeleteIfExists = shouldDeleteIfExists;
176 }
177
178
179
180
181
182
183
184
185
186
187 public void setAppendAllowed(boolean append) {
188 this.append = append;
189 this.shouldDeleteIfExists = false;
190 }
191
192
193
194
195
196
197
198 public void setShouldDeleteIfEmpty(boolean shouldDeleteIfEmpty) {
199 this.shouldDeleteIfEmpty = shouldDeleteIfEmpty;
200 }
201
202
203
204
205
206
207
208
209
210 public void setSaveState(boolean saveState) {
211 this.saveState = saveState;
212 }
213
214
215
216
217
218 public void setHeaderCallback(FlatFileHeaderCallback headerCallback) {
219 this.headerCallback = headerCallback;
220 }
221
222
223
224
225
226 public void setFooterCallback(FlatFileFooterCallback footerCallback) {
227 this.footerCallback = footerCallback;
228 }
229
230
231
232
233
234 public void setTransactional(boolean transactional) {
235 this.transactional = transactional;
236 }
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251 @Override
252 public void write(List<? extends T> items) throws Exception {
253
254 if (!getOutputState().isInitialized()) {
255 throw new WriterNotOpenException("Writer must be open before it can be written to");
256 }
257
258 if (logger.isDebugEnabled()) {
259 logger.debug("Writing to flat file with " + items.size() + " items.");
260 }
261
262 OutputState state = getOutputState();
263
264 StringBuilder lines = new StringBuilder();
265 int lineCount = 0;
266 for (T item : items) {
267 lines.append(lineAggregator.aggregate(item) + lineSeparator);
268 lineCount++;
269 }
270 try {
271 state.write(lines.toString());
272 }
273 catch (IOException e) {
274 throw new WriteFailedException("Could not write data. The file may be corrupt.", e);
275 }
276 state.linesWritten += lineCount;
277 }
278
279
280
281
282 @Override
283 public void close() {
284 if (state != null) {
285 try {
286 if (footerCallback != null && state.outputBufferedWriter != null) {
287 footerCallback.writeFooter(state.outputBufferedWriter);
288 state.outputBufferedWriter.flush();
289 }
290 }
291 catch (IOException e) {
292 throw new ItemStreamException("Failed to write footer before closing", e);
293 }
294 finally {
295 state.close();
296 if (state.linesWritten == 0 && shouldDeleteIfEmpty) {
297 try {
298 resource.getFile().delete();
299 }
300 catch (IOException e) {
301 throw new ItemStreamException("Failed to delete empty file on close", e);
302 }
303 }
304 state = null;
305 }
306 }
307 }
308
309
310
311
312
313
314
315 @Override
316 public void open(ExecutionContext executionContext) throws ItemStreamException {
317
318 Assert.notNull(resource, "The resource must be set");
319
320 if (!getOutputState().isInitialized()) {
321 doOpen(executionContext);
322 }
323 }
324
325 private void doOpen(ExecutionContext executionContext) throws ItemStreamException {
326 OutputState outputState = getOutputState();
327 if (executionContext.containsKey(getKey(RESTART_DATA_NAME))) {
328 outputState.restoreFrom(executionContext);
329 }
330 try {
331 outputState.initializeBufferedWriter();
332 }
333 catch (IOException ioe) {
334 throw new ItemStreamException("Failed to initialize writer", ioe);
335 }
336 if (outputState.lastMarkedByteOffsetPosition == 0 && !outputState.appending) {
337 if (headerCallback != null) {
338 try {
339 headerCallback.writeHeader(outputState.outputBufferedWriter);
340 outputState.write(lineSeparator);
341 }
342 catch (IOException e) {
343 throw new ItemStreamException("Could not write headers. The file may be corrupt.", e);
344 }
345 }
346 }
347 }
348
349
350
351
352 @Override
353 public void update(ExecutionContext executionContext) {
354 if (state == null) {
355 throw new ItemStreamException("ItemStream not open or already closed.");
356 }
357
358 Assert.notNull(executionContext, "ExecutionContext must not be null");
359
360 if (saveState) {
361
362 try {
363 executionContext.putLong(getKey(RESTART_DATA_NAME), state.position());
364 }
365 catch (IOException e) {
366 throw new ItemStreamException("ItemStream does not return current position properly", e);
367 }
368
369 executionContext.putLong(getKey(WRITTEN_STATISTICS_NAME), state.linesWritten);
370 }
371 }
372
373
374 private OutputState getOutputState() {
375 if (state == null) {
376 File file;
377 try {
378 file = resource.getFile();
379 }
380 catch (IOException e) {
381 throw new ItemStreamException("Could not convert resource to file: [" + resource + "]", e);
382 }
383 Assert.state(!file.exists() || file.canWrite(), "Resource is not writable: [" + resource + "]");
384 state = new OutputState();
385 state.setDeleteIfExists(shouldDeleteIfExists);
386 state.setAppendAllowed(append);
387 state.setEncoding(encoding);
388 }
389 return (OutputState) state;
390 }
391
392
393
394
395
396 private class OutputState {
397
398 private static final String DEFAULT_CHARSET = "UTF-8";
399
400 private FileOutputStream os;
401
402
403 Writer outputBufferedWriter;
404
405 FileChannel fileChannel;
406
407
408
409 String encoding = DEFAULT_CHARSET;
410
411 boolean restarted = false;
412
413 long lastMarkedByteOffsetPosition = 0;
414
415 long linesWritten = 0;
416
417 boolean shouldDeleteIfExists = true;
418
419 boolean initialized = false;
420
421 private boolean append = false;
422
423 private boolean appending = false;
424
425
426
427
428
429 public long position() throws IOException {
430 long pos = 0;
431
432 if (fileChannel == null) {
433 return 0;
434 }
435
436 outputBufferedWriter.flush();
437 pos = fileChannel.position();
438 if (transactional) {
439 pos += ((TransactionAwareBufferedWriter) outputBufferedWriter).getBufferSize();
440 }
441
442 return pos;
443
444 }
445
446
447
448
449 public void setAppendAllowed(boolean append) {
450 this.append = append;
451 }
452
453
454
455
456 public void restoreFrom(ExecutionContext executionContext) {
457 lastMarkedByteOffsetPosition = executionContext.getLong(getKey(RESTART_DATA_NAME));
458 restarted = true;
459 }
460
461
462
463
464 public void setDeleteIfExists(boolean shouldDeleteIfExists) {
465 this.shouldDeleteIfExists = shouldDeleteIfExists;
466 }
467
468
469
470
471 public void setEncoding(String encoding) {
472 this.encoding = encoding;
473 }
474
475
476
477
478 public void close() {
479
480 initialized = false;
481 restarted = false;
482 try {
483 if (outputBufferedWriter != null) {
484 outputBufferedWriter.close();
485 }
486 }
487 catch (IOException ioe) {
488 throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
489 }
490 finally {
491 if (!transactional) {
492 closeStream();
493 }
494 }
495 }
496
497 private void closeStream() {
498 try {
499 if (fileChannel != null) {
500 fileChannel.close();
501 }
502 }
503 catch (IOException ioe) {
504 throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
505 }
506 finally {
507 try {
508 if (os != null) {
509 os.close();
510 }
511 }
512 catch (IOException ioe) {
513 throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
514 }
515 }
516 }
517
518
519
520
521
522 public void write(String line) throws IOException {
523 if (!initialized) {
524 initializeBufferedWriter();
525 }
526
527 outputBufferedWriter.write(line);
528 outputBufferedWriter.flush();
529 }
530
531
532
533
534
535
536 public void truncate() throws IOException {
537 fileChannel.truncate(lastMarkedByteOffsetPosition);
538 fileChannel.position(lastMarkedByteOffsetPosition);
539 }
540
541
542
543
544
545
546 private void initializeBufferedWriter() throws IOException {
547
548 File file = resource.getFile();
549 FileUtils.setUpOutputFile(file, restarted, append, shouldDeleteIfExists);
550
551 os = new FileOutputStream(file.getAbsolutePath(), true);
552 fileChannel = os.getChannel();
553
554 outputBufferedWriter = getBufferedWriter(fileChannel, encoding);
555 outputBufferedWriter.flush();
556
557 if (append) {
558
559
560 if (file.length() > 0) {
561 appending = true;
562
563 }
564 }
565
566 Assert.state(outputBufferedWriter != null);
567
568 if (restarted) {
569 checkFileSize();
570 truncate();
571 }
572
573 initialized = true;
574 linesWritten = 0;
575 }
576
577 public boolean isInitialized() {
578 return initialized;
579 }
580
581
582
583
584
585 private Writer getBufferedWriter(FileChannel fileChannel, String encoding) {
586 try {
587 final FileChannel channel = fileChannel;
588 if (transactional) {
589 TransactionAwareBufferedWriter writer = new TransactionAwareBufferedWriter(channel, new Runnable() {
590 @Override
591 public void run() {
592 closeStream();
593 }
594 });
595
596 writer.setEncoding(encoding);
597 return writer;
598 }
599 else {
600 Writer writer = new BufferedWriter(Channels.newWriter(fileChannel, encoding)) {
601 @Override
602 public void flush() throws IOException {
603 super.flush();
604 if (forceSync) {
605 channel.force(false);
606 }
607 }
608 };
609
610 return new BufferedWriter(writer);
611 }
612 }
613 catch (UnsupportedCharsetException ucse) {
614 throw new ItemStreamException("Bad encoding configuration for output file " + fileChannel, ucse);
615 }
616 }
617
618
619
620
621
622
623
624
625 private void checkFileSize() throws IOException {
626 long size = -1;
627
628 outputBufferedWriter.flush();
629 size = fileChannel.size();
630
631 if (size < lastMarkedByteOffsetPosition) {
632 throw new ItemStreamException("Current file size is smaller than size at last commit");
633 }
634 }
635
636 }
637
638 }