1 /*
2 * Copyright 2006-2010 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.springframework.batch.item.support;
17
18 import java.util.Map.Entry;
19
20 import org.springframework.batch.item.ExecutionContext;
21 import org.springframework.batch.item.ItemReader;
22 import org.springframework.batch.item.ItemStream;
23 import org.springframework.batch.item.ItemStreamException;
24 import org.springframework.batch.item.ItemStreamReader;
25 import org.springframework.batch.item.ParseException;
26 import org.springframework.batch.item.PeekableItemReader;
27 import org.springframework.batch.item.UnexpectedInputException;
28
29 /**
30 * <p>
31 * A {@link PeekableItemReader} that allows the user to peek one item ahead.
32 * Repeated calls to {@link #peek()} will return the same item, and this will be
33 * the next item returned from {@link #read()}.
34 * </p>
35 *
36 * <p>
37 * Intentionally not thread safe: it wouldn't be possible to honour the peek in
38 * multiple threads because only one of the threads that peeked would get that
39 * item in the next call to read.
40 * </p>
41 *
42 * @author Dave Syer
43 *
44 */
45 public class SingleItemPeekableItemReader<T> implements ItemStreamReader<T>, PeekableItemReader<T> {
46
47 private ItemReader<T> delegate;
48
49 private T next;
50
51 private ExecutionContext executionContext = new ExecutionContext();
52
53 /**
54 * The item reader to use as a delegate. Items are read from the delegate
55 * and passed to the caller in {@link #read()}.
56 *
57 * @param delegate the delegate to set
58 */
59 public void setDelegate(ItemReader<T> delegate) {
60 this.delegate = delegate;
61 }
62
63 /**
64 * Get the next item from the delegate (whether or not it has already been
65 * peeked at).
66 *
67 * @see ItemReader#read()
68 */
69 @Override
70 public T read() throws Exception, UnexpectedInputException, ParseException {
71 if (next != null) {
72 T item = next;
73 next = null;
74 // executionContext = new ExecutionContext();
75 return item;
76 }
77 return delegate.read();
78 }
79
80 /**
81 * Peek at the next item, ensuring that if the delegate is an
82 * {@link ItemStream} the state is stored for the next call to
83 * {@link #update(ExecutionContext)}.
84 *
85 * @return the next item (or null if there is none).
86 *
87 * @see PeekableItemReader#peek()
88 */
89 @Override
90 public T peek() throws Exception, UnexpectedInputException, ParseException {
91 if (next == null) {
92 updateDelegate(executionContext);
93 next = delegate.read();
94 }
95 return next;
96 }
97
98 /**
99 * If the delegate is an {@link ItemStream}, just pass the call on,
100 * otherwise reset the peek cache.
101 *
102 * @throws ItemStreamException if there is a problem
103 * @see ItemStream#close()
104 */
105 @Override
106 public void close() throws ItemStreamException {
107 next = null;
108 if (delegate instanceof ItemStream) {
109 ((ItemStream) delegate).close();
110 }
111 executionContext = new ExecutionContext();
112 }
113
114 /**
115 * If the delegate is an {@link ItemStream}, just pass the call on,
116 * otherwise reset the peek cache.
117 *
118 * @param executionContext the current context
119 * @throws ItemStreamException if there is a problem
120 * @see ItemStream#open(ExecutionContext)
121 */
122 @Override
123 public void open(ExecutionContext executionContext) throws ItemStreamException {
124 next = null;
125 if (delegate instanceof ItemStream) {
126 ((ItemStream) delegate).open(executionContext);
127 }
128 executionContext = new ExecutionContext();
129 }
130
131 /**
132 * If there is a cached peek, then retrieve the execution context state from
133 * that point. If there is no peek cached, then call directly to the
134 * delegate.
135 *
136 * @param executionContext the current context
137 * @throws ItemStreamException if there is a problem
138 * @see ItemStream#update(ExecutionContext)
139 */
140 @Override
141 public void update(ExecutionContext executionContext) throws ItemStreamException {
142 if (next != null) {
143 // Get the last state from the delegate instead of using
144 // current value.
145 for (Entry<String, Object> entry : this.executionContext.entrySet()) {
146 executionContext.put(entry.getKey(), entry.getValue());
147 }
148 return;
149 }
150 updateDelegate(executionContext);
151 }
152
153 private void updateDelegate(ExecutionContext executionContext) {
154 if (delegate instanceof ItemStream) {
155 ((ItemStream) delegate).update(executionContext);
156 }
157 }
158
159 }