1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.item.file;
18
19 import java.io.File;
20 import java.io.IOException;
21 import java.util.List;
22
23 import org.springframework.batch.item.ExecutionContext;
24 import org.springframework.batch.item.ItemStream;
25 import org.springframework.batch.item.ItemStreamException;
26 import org.springframework.batch.item.ItemWriter;
27 import org.springframework.batch.item.util.ExecutionContextUserSupport;
28 import org.springframework.core.io.FileSystemResource;
29 import org.springframework.core.io.Resource;
30 import org.springframework.util.Assert;
31 import org.springframework.util.ClassUtils;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 public class MultiResourceItemWriter<T> extends ExecutionContextUserSupport implements ItemWriter<T>, ItemStream {
48
49 final static private String RESOURCE_INDEX_KEY = "resource.index";
50
51 final static private String CURRENT_RESOURCE_ITEM_COUNT = "resource.item.count";
52
53 private Resource resource;
54
55 private ResourceAwareItemWriterItemStream<? super T> delegate;
56
57 private int itemCountLimitPerResource = Integer.MAX_VALUE;
58
59 private int currentResourceItemCount = 0;
60
61 private int resourceIndex = 1;
62
63 private ResourceSuffixCreator suffixCreator = new SimpleResourceSuffixCreator();
64
65 private boolean saveState = true;
66
67 private boolean opened = false;
68
69 public MultiResourceItemWriter() {
70 setName(ClassUtils.getShortName(MultiResourceItemWriter.class));
71 }
72
73 @Override
74 public void write(List<? extends T> items) throws Exception {
75 if (!opened) {
76 File file = setResourceToDelegate();
77
78 file.createNewFile();
79 Assert.state(file.canWrite(), "Output resource " + file.getAbsolutePath() + " must be writable");
80 delegate.open(new ExecutionContext());
81 opened = true;
82 }
83 delegate.write(items);
84 currentResourceItemCount += items.size();
85 if (currentResourceItemCount >= itemCountLimitPerResource) {
86 delegate.close();
87 resourceIndex++;
88 currentResourceItemCount = 0;
89 setResourceToDelegate();
90 opened = false;
91 }
92 }
93
94
95
96
97
98 public void setResourceSuffixCreator(ResourceSuffixCreator suffixCreator) {
99 this.suffixCreator = suffixCreator;
100 }
101
102
103
104
105
106 public void setItemCountLimitPerResource(int itemCountLimitPerResource) {
107 this.itemCountLimitPerResource = itemCountLimitPerResource;
108 }
109
110
111
112
113 public void setDelegate(ResourceAwareItemWriterItemStream<? super T> delegate) {
114 this.delegate = delegate;
115 }
116
117
118
119
120
121
122
123 public void setResource(Resource resource) {
124 this.resource = resource;
125 }
126
127 public void setSaveState(boolean saveState) {
128 this.saveState = saveState;
129 }
130
131 @Override
132 public void close() throws ItemStreamException {
133 resourceIndex = 1;
134 currentResourceItemCount = 0;
135 if (opened) {
136 delegate.close();
137 }
138 }
139
140 @Override
141 public void open(ExecutionContext executionContext) throws ItemStreamException {
142 resourceIndex = executionContext.getInt(getKey(RESOURCE_INDEX_KEY), 1);
143 currentResourceItemCount = executionContext.getInt(getKey(CURRENT_RESOURCE_ITEM_COUNT), 0);
144
145 try {
146 setResourceToDelegate();
147 }
148 catch (IOException e) {
149 throw new ItemStreamException("Couldn't assign resource", e);
150 }
151
152 if (executionContext.containsKey(getKey(CURRENT_RESOURCE_ITEM_COUNT))) {
153
154 delegate.open(executionContext);
155 }
156 else {
157 opened = false;
158 }
159 }
160
161 @Override
162 public void update(ExecutionContext executionContext) throws ItemStreamException {
163 if (saveState) {
164 if (opened) {
165 delegate.update(executionContext);
166 }
167 executionContext.putInt(getKey(CURRENT_RESOURCE_ITEM_COUNT), currentResourceItemCount);
168 executionContext.putInt(getKey(RESOURCE_INDEX_KEY), resourceIndex);
169 }
170 }
171
172
173
174
175 private File setResourceToDelegate() throws IOException {
176 String path = resource.getFile().getAbsolutePath() + suffixCreator.getSuffix(resourceIndex);
177 File file = new File(path);
178 delegate.setResource(new FileSystemResource(file));
179 return file;
180 }
181 }