1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.step.item;
17
18 import org.apache.commons.logging.Log;
19 import org.apache.commons.logging.LogFactory;
20 import org.springframework.batch.item.ExecutionContext;
21 import org.springframework.batch.item.ItemReader;
22 import org.springframework.batch.item.ItemStream;
23 import org.springframework.batch.item.ItemStreamException;
24 import org.springframework.batch.item.support.CompositeItemStream;
25
26
27
28
29
30
31
32
33
34
35 public class ChunkMonitor implements ItemStream {
36
37 private Log logger = LogFactory.getLog(getClass());
38
39 private boolean streamsRegistered = false;
40
41 public static class ChunkMonitorData {
42 public int offset;
43
44 public int chunkSize;
45
46 public ChunkMonitorData(int offset, int chunkSize) {
47 this.offset = offset;
48 this.chunkSize = chunkSize;
49 }
50 }
51
52 private static final String OFFSET = ChunkMonitor.class.getName() + ".OFFSET";
53
54 private CompositeItemStream stream = new CompositeItemStream();
55
56 private ThreadLocal<ChunkMonitorData> holder = new ThreadLocal<ChunkMonitorData>();
57
58 private ItemReader<?> reader;
59
60
61
62
63 public void registerItemStream(ItemStream stream) {
64 streamsRegistered = true;
65 this.stream.register(stream);
66 }
67
68
69
70
71 public void setItemReader(ItemReader<?> reader) {
72 this.reader = reader;
73 }
74
75 public void incrementOffset() {
76 ChunkMonitorData data = getData();
77 data.offset ++;
78 if (data.offset >= data.chunkSize) {
79 resetOffset();
80 }
81 }
82
83 public int getOffset() {
84 return getData().offset;
85 }
86
87 public void resetOffset() {
88 getData().offset = 0;
89 }
90
91 public void setChunkSize(int chunkSize) {
92 getData().chunkSize = chunkSize;
93 resetOffset();
94 }
95
96 @Override
97 public void close() throws ItemStreamException {
98 holder.set(null);
99 if (streamsRegistered) {
100 stream.close();
101 }
102 }
103
104 @Override
105 public void open(ExecutionContext executionContext) throws ItemStreamException {
106 if (streamsRegistered) {
107 stream.open(executionContext);
108 ChunkMonitorData data = new ChunkMonitorData(executionContext.getInt(OFFSET, 0), 0);
109 holder.set(data);
110 if (reader == null) {
111 logger.warn("No ItemReader set (must be concurrent step), so ignoring offset data.");
112 return;
113 }
114 for (int i = 0; i < data.offset; i++) {
115 try {
116 reader.read();
117 }
118 catch (Exception e) {
119 throw new ItemStreamException("Could not position reader with offset: " + data.offset, e);
120 }
121 }
122 }
123 }
124
125 @Override
126 public void update(ExecutionContext executionContext) throws ItemStreamException {
127 if (streamsRegistered) {
128 ChunkMonitorData data = getData();
129 if (data.offset == 0) {
130
131
132 stream.update(executionContext);
133 }
134 else {
135 executionContext.putInt(OFFSET, data.offset);
136 }
137 }
138 }
139
140 private ChunkMonitorData getData() {
141 ChunkMonitorData data = holder.get();
142 if (data==null) {
143 if (streamsRegistered) {
144 logger.warn("ItemStream was opened in a different thread. Restart data could be compromised.");
145 }
146 data = new ChunkMonitorData(0,0);
147 holder.set(data);
148 }
149 return data;
150 }
151
152 }