1 /*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.springframework.batch.item.support;
17
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.List;
21
22 import org.springframework.batch.item.ExecutionContext;
23 import org.springframework.batch.item.ItemStream;
24 import org.springframework.batch.item.ItemStreamException;
25
26 /**
27 * Simple {@link ItemStream} that delegates to a list of other streams.
28 *
29 * @author Dave Syer
30 *
31 */
32 public class CompositeItemStream implements ItemStream {
33
34 private List<ItemStream> streams = new ArrayList<ItemStream>();
35
36 /**
37 * Public setter for the listeners.
38 *
39 * @param listeners
40 */
41 public void setStreams(ItemStream[] listeners) {
42 this.streams = Arrays.asList(listeners);
43 }
44
45 /**
46 * Register a {@link ItemStream} as one of the interesting providers under
47 * the provided key.
48 *
49 */
50 public void register(ItemStream stream) {
51 synchronized (streams) {
52 if (!streams.contains(stream)) {
53 streams.add(stream);
54 }
55 }
56 }
57
58 /**
59 *
60 */
61 public CompositeItemStream() {
62 super();
63 }
64
65 /**
66 * Simple aggregate {@link ExecutionContext} provider for the contributions
67 * registered under the given key.
68 *
69 * @see org.springframework.batch.item.ItemStream#update(ExecutionContext)
70 */
71 @Override
72 public void update(ExecutionContext executionContext) {
73 for (ItemStream itemStream : streams) {
74 itemStream.update(executionContext);
75 }
76 }
77
78 /**
79 * Broadcast the call to close.
80 * @throws ItemStreamException
81 */
82 @Override
83 public void close() throws ItemStreamException {
84 for (ItemStream itemStream : streams) {
85 itemStream.close();
86 }
87 }
88
89 /**
90 * Broadcast the call to open.
91 * @throws ItemStreamException
92 */
93 @Override
94 public void open(ExecutionContext executionContext) throws ItemStreamException {
95 for (ItemStream itemStream : streams) {
96 itemStream.open(executionContext);
97 }
98 }
99
100 }