1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.item.database;
17
18 import java.util.List;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
23 import org.springframework.beans.factory.InitializingBean;
24 import org.springframework.util.Assert;
25 import org.springframework.util.ClassUtils;
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 public abstract class AbstractPagingItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements
43 InitializingBean {
44
45 protected Log logger = LogFactory.getLog(getClass());
46
47 private volatile boolean initialized = false;
48
49 private int pageSize = 10;
50
51 private volatile int current = 0;
52
53 private volatile int page = 0;
54
55 protected volatile List<T> results;
56
57 private Object lock = new Object();
58
59 public AbstractPagingItemReader() {
60 setName(ClassUtils.getShortName(AbstractPagingItemReader.class));
61 }
62
63
64
65
66
67 public int getPage() {
68 return page;
69 }
70
71
72
73
74
75 public int getPageSize() {
76 return pageSize;
77 }
78
79
80
81
82
83
84 public void setPageSize(int pageSize) {
85 this.pageSize = pageSize;
86 }
87
88
89
90
91
92 @Override
93 public void afterPropertiesSet() throws Exception {
94 Assert.isTrue(pageSize > 0, "pageSize must be greater than zero");
95 }
96
97 @Override
98 protected T doRead() throws Exception {
99
100 synchronized (lock) {
101
102 if (results == null || current >= pageSize) {
103
104 if (logger.isDebugEnabled()) {
105 logger.debug("Reading page " + getPage());
106 }
107
108 doReadPage();
109 page++;
110 if (current >= pageSize) {
111 current = 0;
112 }
113
114 }
115
116 int next = current++;
117 if (next < results.size()) {
118 return results.get(next);
119 }
120 else {
121 return null;
122 }
123
124 }
125
126 }
127
128 abstract protected void doReadPage();
129
130 @Override
131 protected void doOpen() throws Exception {
132
133 Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first");
134 initialized = true;
135
136 }
137
138 @Override
139 protected void doClose() throws Exception {
140
141 synchronized (lock) {
142 initialized = false;
143 current = 0;
144 page = 0;
145 results = null;
146 }
147
148 }
149
150 @Override
151 protected void jumpToItem(int itemIndex) throws Exception {
152
153 synchronized (lock) {
154 page = itemIndex / pageSize;
155 current = itemIndex % pageSize;
156 }
157
158 doJumpToPage(itemIndex);
159
160 if (logger.isDebugEnabled()) {
161 logger.debug("Jumping to page " + getPage() + " and index " + current);
162 }
163
164 }
165
166 abstract protected void doJumpToPage(int itemIndex);
167
168 }