1 /*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.springframework.batch.sample.domain.multiline;
18
19 import java.util.ArrayList;
20 import java.util.List;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.springframework.batch.item.ItemReader;
25
26 /**
27 * An {@link ItemReader} that delivers a list as its item, storing up objects
28 * from the injected {@link ItemReader} until they are ready to be packed out as
29 * a collection. This class must be used as a wrapper for a custom
30 * {@link ItemReader} that can identify the record boundaries. The custom reader
31 * should mark the beginning and end of records by returning an
32 * {@link AggregateItem} which responds true to its query methods
33 * <code>is*()</code>.<br/><br/>
34 *
35 * This class is thread safe (it can be used concurrently by multiple threads)
36 * as long as the {@link ItemReader} is also thread safe.
37 *
38 * @see AggregateItem#isHeader()
39 * @see AggregateItem#isFooter()
40 *
41 * @author Dave Syer
42 *
43 */
44 public class AggregateItemReader<T> implements ItemReader<List<T>> {
45
46 private static final Log log = LogFactory.getLog(AggregateItemReader.class);
47
48 private ItemReader<AggregateItem<T>> itemReader;
49
50 /**
51 * Get the next list of records.
52 * @throws Exception
53 *
54 * @see org.springframework.batch.item.ItemReader#read()
55 */
56 public List<T> read() throws Exception {
57 ResultHolder holder = new ResultHolder();
58
59 while (process(itemReader.read(), holder)) {
60 continue;
61 }
62
63 if (!holder.exhausted) {
64 return holder.records;
65 }
66 else {
67 return null;
68 }
69 }
70
71 private boolean process(AggregateItem<T> value, ResultHolder holder) {
72 // finish processing if we hit the end of file
73 if (value == null) {
74 log.debug("Exhausted ItemReader");
75 holder.exhausted = true;
76 return false;
77 }
78
79 // start a new collection
80 if (value.isHeader()) {
81 log.debug("Start of new record detected");
82 return true;
83 }
84
85 // mark we are finished with current collection
86 if (value.isFooter()) {
87 log.debug("End of record detected");
88 return false;
89 }
90
91 // add a simple record to the current collection
92 log.debug("Mapping: " + value);
93 holder.records.add(value.getItem());
94 return true;
95 }
96
97 public void setItemReader(ItemReader<AggregateItem<T>> itemReader) {
98 this.itemReader = itemReader;
99 }
100
101 /**
102 * Private class for temporary state management while item is being
103 * collected.
104 *
105 * @author Dave Syer
106 *
107 */
108 private class ResultHolder {
109 List<T> records = new ArrayList<T>();
110
111 boolean exhausted = false;
112 }
113
114 }