1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.item.jms;
18
19 import javax.jms.Message;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.springframework.batch.item.ItemReader;
24 import org.springframework.beans.factory.InitializingBean;
25 import org.springframework.jms.core.JmsOperations;
26 import org.springframework.jms.core.JmsTemplate;
27 import org.springframework.util.Assert;
28
29
30
31
32
33
34
35
36
37
38
39
40
41 public class JmsItemReader<T> implements ItemReader<T>, InitializingBean {
42
43 protected Log logger = LogFactory.getLog(getClass());
44
45 protected Class<? extends T> itemType;
46
47 protected JmsOperations jmsTemplate;
48
49
50
51
52
53
54 public void setJmsTemplate(JmsOperations jmsTemplate) {
55 this.jmsTemplate = jmsTemplate;
56 if (jmsTemplate instanceof JmsTemplate) {
57 JmsTemplate template = (JmsTemplate) jmsTemplate;
58 Assert.isTrue(template.getReceiveTimeout() != JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT,
59 "JmsTemplate must have a receive timeout!");
60 Assert.isTrue(template.getDefaultDestination() != null || template.getDefaultDestinationName() != null,
61 "JmsTemplate must have a defaultDestination or defaultDestinationName!");
62 }
63 }
64
65
66
67
68
69
70
71
72
73
74
75 public void setItemType(Class<? extends T> itemType) {
76 this.itemType = itemType;
77 }
78
79 @Override
80 @SuppressWarnings("unchecked")
81 public T read() {
82 if (itemType != null && itemType.isAssignableFrom(Message.class)) {
83 return (T) jmsTemplate.receive();
84 }
85 Object result = jmsTemplate.receiveAndConvert();
86 if (itemType != null && result != null) {
87 Assert.state(itemType.isAssignableFrom(result.getClass()),
88 "Received message payload of wrong type: expected [" + itemType + "]");
89 }
90 return (T) result;
91 }
92
93 @Override
94 public void afterPropertiesSet() throws Exception {
95 Assert.notNull(this.jmsTemplate, "The 'jmsTemplate' is required.");
96 }
97 }