1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.core.step.item;
18
19 import java.util.List;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.springframework.batch.core.StepContribution;
24 import org.springframework.batch.core.StepListener;
25 import org.springframework.batch.core.listener.MulticasterBatchListener;
26 import org.springframework.batch.item.ItemReader;
27 import org.springframework.batch.repeat.RepeatCallback;
28 import org.springframework.batch.repeat.RepeatContext;
29 import org.springframework.batch.repeat.RepeatOperations;
30 import org.springframework.batch.repeat.RepeatStatus;
31
32
33
34
35
36
37
38
39
40 public class SimpleChunkProvider<I> implements ChunkProvider<I> {
41
42 protected final Log logger = LogFactory.getLog(getClass());
43
44 protected final ItemReader<? extends I> itemReader;
45
46 private final MulticasterBatchListener<I, ?> listener = new MulticasterBatchListener<I, Object>();
47
48 private final RepeatOperations repeatOperations;
49
50 public SimpleChunkProvider(ItemReader<? extends I> itemReader, RepeatOperations repeatOperations) {
51 this.itemReader = itemReader;
52 this.repeatOperations = repeatOperations;
53 }
54
55
56
57
58
59
60
61 public void setListeners(List<? extends StepListener> listeners) {
62 for (StepListener listener : listeners) {
63 registerListener(listener);
64 }
65 }
66
67
68
69
70
71
72 public void registerListener(StepListener listener) {
73 this.listener.register(listener);
74 }
75
76
77
78
79 protected MulticasterBatchListener<I, ?> getListener() {
80 return listener;
81 }
82
83
84
85
86
87
88 protected final I doRead() throws Exception {
89 try {
90 listener.beforeRead();
91 I item = itemReader.read();
92 if(item != null) {
93 listener.afterRead(item);
94 }
95 return item;
96 }
97 catch (Exception e) {
98 logger.debug(e.getMessage() + " : " + e.getClass().getName());
99 listener.onReadError(e);
100 throw e;
101 }
102 }
103
104 @Override
105 public Chunk<I> provide(final StepContribution contribution) throws Exception {
106
107 final Chunk<I> inputs = new Chunk<I>();
108 repeatOperations.iterate(new RepeatCallback() {
109
110 @Override
111 public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
112 I item = null;
113 try {
114 item = read(contribution, inputs);
115 }
116 catch (SkipOverflowException e) {
117
118
119 return RepeatStatus.FINISHED;
120 }
121 if (item == null) {
122 inputs.setEnd();
123 return RepeatStatus.FINISHED;
124 }
125 inputs.add(item);
126 contribution.incrementReadCount();
127 return RepeatStatus.CONTINUABLE;
128 }
129
130 });
131
132 return inputs;
133
134 }
135
136 @Override
137 public void postProcess(StepContribution contribution, Chunk<I> chunk) {
138
139 }
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154 protected I read(StepContribution contribution, Chunk<I> chunk) throws SkipOverflowException, Exception {
155 return doRead();
156 }
157
158 }