1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.support.transaction;
17
18 import java.io.IOException;
19 import java.io.Writer;
20 import java.nio.ByteBuffer;
21 import java.nio.channels.FileChannel;
22
23 import org.springframework.transaction.support.TransactionSynchronizationAdapter;
24 import org.springframework.transaction.support.TransactionSynchronizationManager;
25
26
27
28
29
30
31
32
33
34
35
36 public class TransactionAwareBufferedWriter extends Writer {
37
38 private static final String BUFFER_KEY_PREFIX = TransactionAwareBufferedWriter.class.getName() + ".BUFFER_KEY";
39
40 private static final String CLOSE_KEY_PREFIX = TransactionAwareBufferedWriter.class.getName() + ".CLOSE_KEY";
41
42 private final String bufferKey;
43
44 private final String closeKey;
45
46 private FileChannel channel;
47
48 private final Runnable closeCallback;
49
50
51 private static final String DEFAULT_CHARSET = "UTF-8";
52
53 private String encoding = DEFAULT_CHARSET;
54
55
56
57
58
59
60
61
62
63 public TransactionAwareBufferedWriter(FileChannel channel, Runnable closeCallback) {
64 super();
65 this.channel = channel;
66 this.closeCallback = closeCallback;
67 this.bufferKey = BUFFER_KEY_PREFIX + "." + hashCode();
68 this.closeKey = CLOSE_KEY_PREFIX + "." + hashCode();
69 }
70
71 public void setEncoding(String encoding) {
72 this.encoding = encoding;
73 }
74
75
76
77
78 private StringBuffer getCurrentBuffer() {
79
80 if (!TransactionSynchronizationManager.hasResource(bufferKey)) {
81
82 TransactionSynchronizationManager.bindResource(bufferKey, new StringBuffer());
83
84 TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
85 @Override
86 public void afterCompletion(int status) {
87 clear();
88 }
89
90 @Override
91 public void beforeCommit(boolean readOnly) {
92 try {
93 if(!readOnly) {
94 complete();
95 }
96 }
97 catch (IOException e) {
98 throw new FlushFailedException("Could not write to output buffer", e);
99 }
100 }
101
102 private void complete() throws IOException {
103 StringBuffer buffer = (StringBuffer) TransactionSynchronizationManager.getResource(bufferKey);
104 if (buffer != null) {
105 String string = buffer.toString();
106 byte[] bytes = string.getBytes(encoding);
107 int bufferLength = bytes.length;
108 ByteBuffer bb = ByteBuffer.wrap(bytes);
109 int bytesWritten = channel.write(bb);
110 if(bytesWritten != bufferLength) {
111 throw new IOException("All bytes to be written were not successfully written");
112 }
113 if (TransactionSynchronizationManager.hasResource(closeKey)) {
114 closeCallback.run();
115 }
116 }
117 }
118
119 private void clear() {
120 if (TransactionSynchronizationManager.hasResource(bufferKey)) {
121 TransactionSynchronizationManager.unbindResource(bufferKey);
122 }
123 if (TransactionSynchronizationManager.hasResource(closeKey)) {
124 TransactionSynchronizationManager.unbindResource(closeKey);
125 }
126 }
127
128 });
129
130 }
131
132 return (StringBuffer) TransactionSynchronizationManager.getResource(bufferKey);
133
134 }
135
136
137
138
139
140
141
142 public long getBufferSize() {
143 if (!transactionActive()) {
144 return 0L;
145 }
146 return getCurrentBuffer().length();
147 }
148
149
150
151
152 private boolean transactionActive() {
153 return TransactionSynchronizationManager.isActualTransactionActive();
154 }
155
156
157
158
159
160
161 @Override
162 public void close() throws IOException {
163 if (transactionActive()) {
164 if (getCurrentBuffer().length() > 0) {
165 TransactionSynchronizationManager.bindResource(closeKey, Boolean.TRUE);
166 }
167 return;
168 }
169 closeCallback.run();
170 }
171
172
173
174
175
176
177 @Override
178 public void flush() throws IOException {
179 if (!transactionActive()) {
180 channel.force(false);
181 }
182 }
183
184
185
186
187
188
189 @Override
190 public void write(char[] cbuf, int off, int len) throws IOException {
191
192 if (!transactionActive()) {
193 char [] subArray = new char[len];
194 System.arraycopy(cbuf, off, subArray, 0, len);
195 byte[] bytes = new String(subArray).getBytes(encoding);
196 int length = bytes.length;
197 ByteBuffer bb = ByteBuffer.wrap(bytes);
198 int bytesWritten = channel.write(bb);
199 if(bytesWritten != length) {
200 throw new IOException("Unable to write all data. Bytes to write: " + len + ". Bytes written: " + bytesWritten);
201 }
202 return;
203 }
204
205 StringBuffer buffer = getCurrentBuffer();
206 buffer.append(cbuf, off, len);
207 }
208 }