1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.item.file;
18
19 import java.util.Arrays;
20 import java.util.Comparator;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.springframework.batch.item.*;
25 import org.springframework.batch.item.util.ExecutionContextUserSupport;
26 import org.springframework.core.io.Resource;
27 import org.springframework.util.Assert;
28 import org.springframework.util.ClassUtils;
29
30
31
32
33
34
35
36
37
38
39
40
41 public class MultiResourceItemReader<T> implements ItemReader<T>, ItemStream {
42
43 private static final Log logger = LogFactory.getLog(MultiResourceItemReader.class);
44
45 private static final String RESOURCE_KEY = "resourceIndex";
46
47 private final ExecutionContextUserSupport executionContextUserSupport = new ExecutionContextUserSupport();
48
49 private ResourceAwareItemReaderItemStream<? extends T> delegate;
50
51 private Resource[] resources;
52
53 private boolean saveState = true;
54
55 private int currentResource = -1;
56
57
58 private boolean noInput;
59
60 private boolean strict = false;
61
62
63
64
65
66
67 public void setStrict(boolean strict) {
68 this.strict = strict;
69 }
70
71 private Comparator<Resource> comparator = new Comparator<Resource>() {
72
73
74
75
76 @Override
77 public int compare(Resource r1, Resource r2) {
78 return r1.getFilename().compareTo(r2.getFilename());
79 }
80
81 };
82
83 public MultiResourceItemReader() {
84 executionContextUserSupport.setName(ClassUtils.getShortName(MultiResourceItemReader.class));
85 }
86
87
88
89
90 @Override
91 public T read() throws Exception, UnexpectedInputException, ParseException {
92
93 if (noInput) {
94 return null;
95 }
96
97
98
99 if (currentResource == -1) {
100 currentResource = 0;
101 delegate.setResource(resources[currentResource]);
102 delegate.open(new ExecutionContext());
103 }
104
105 return readNextItem();
106 }
107
108
109
110
111
112
113
114 private T readNextItem() throws Exception {
115
116 T item = readFromDelegate();
117
118 while (item == null) {
119
120 currentResource++;
121
122 if (currentResource >= resources.length) {
123 return null;
124 }
125
126 delegate.close();
127 delegate.setResource(resources[currentResource]);
128 delegate.open(new ExecutionContext());
129
130 item = readFromDelegate();
131 }
132
133 return item;
134 }
135
136 private T readFromDelegate() throws Exception {
137 T item = delegate.read();
138 if(item instanceof ResourceAware){
139 ((ResourceAware) item).setResource(getCurrentResource());
140 }
141 return item;
142 }
143
144
145
146
147 @Override
148 public void close() throws ItemStreamException {
149 delegate.close();
150 noInput = false;
151 }
152
153
154
155
156
157 @Override
158 public void open(ExecutionContext executionContext) throws ItemStreamException {
159
160 Assert.notNull(resources, "Resources must be set");
161
162 noInput = false;
163 if (resources.length == 0) {
164 if (strict) {
165 throw new IllegalStateException(
166 "No resources to read. Set strict=false if this is not an error condition.");
167 }
168 else {
169 logger.warn("No resources to read. Set strict=true if this should be an error condition.");
170 noInput = true;
171 return;
172 }
173 }
174
175 Arrays.sort(resources, comparator);
176
177 if (executionContext.containsKey(executionContextUserSupport.getKey(RESOURCE_KEY))) {
178 currentResource = executionContext.getInt(executionContextUserSupport.getKey(RESOURCE_KEY));
179
180
181 if (currentResource == -1) {
182 currentResource = 0;
183 }
184
185 delegate.setResource(resources[currentResource]);
186 delegate.open(executionContext);
187 }
188 else {
189 currentResource = -1;
190 }
191 }
192
193
194
195
196 @Override
197 public void update(ExecutionContext executionContext) throws ItemStreamException {
198 if (saveState) {
199 executionContext.putInt(executionContextUserSupport.getKey(RESOURCE_KEY), currentResource);
200 delegate.update(executionContext);
201 }
202 }
203
204
205
206
207 public void setDelegate(ResourceAwareItemReaderItemStream<? extends T> delegate) {
208 this.delegate = delegate;
209 }
210
211
212
213
214
215
216
217 public void setSaveState(boolean saveState) {
218 this.saveState = saveState;
219 }
220
221
222
223
224
225 public void setComparator(Comparator<Resource> comparator) {
226 this.comparator = comparator;
227 }
228
229
230
231
232 public void setResources(Resource[] resources) {
233 Assert.notNull(resources, "The resources must not be null");
234 this.resources = Arrays.asList(resources).toArray(new Resource[resources.length]);
235 }
236
237 public Resource getCurrentResource() {
238 if (currentResource >= resources.length || currentResource < 0) {
239 return null;
240 }
241 return resources[currentResource];
242 }
243
244 }