1 /**********************************************************************
2 * ConnectionWriter.java
3 * created on 05.03.2005 by netseeker
4 * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/ConnectionWriter.java,v $
5 * $Date: 2006/02/12 20:45:12 $
6 * $Revision: 1.7 $
7 *
8 * ====================================================================
9 *
10 * Copyright 2005-2006 netseeker aka Michael Manske
11 *
12 * Licensed under the Apache License, Version 2.0 (the "License");
13 * you may not use this file except in compliance with the License.
14 * You may obtain a copy of the License at
15 *
16 * http://www.apache.org/licenses/LICENSE-2.0
17 *
18 * Unless required by applicable law or agreed to in writing, software
19 * distributed under the License is distributed on an "AS IS" BASIS,
20 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21 * See the License for the specific language governing permissions and
22 * limitations under the License.
23 * ====================================================================
24 *
25 * This file is part of the ejoe framework.
26 * For more information on the author, please see
27 * <http://www.manskes.de/>.
28 *
29 *********************************************************************/
30 package de.netseeker.ejoe;
31
32 import java.io.IOException;
33 import java.io.OutputStream;
34 import java.nio.ByteBuffer;
35 import java.nio.channels.Channels;
36 import java.nio.channels.ClosedChannelException;
37 import java.nio.channels.SelectionKey;
38 import java.nio.channels.WritableByteChannel;
39 import java.util.logging.Level;
40 import java.util.logging.Logger;
41
42 import de.netseeker.ejoe.adapter.AdapterFactory;
43 import de.netseeker.ejoe.adapter.SerializeAdapter;
44 import de.netseeker.ejoe.io.ByteBufferOutputStream;
45 import de.netseeker.ejoe.io.IOUtils;
46 import de.netseeker.ejoe.io.IncompleteIOException;
47
48 /***
49 * ConnectionWriter serializes a server answer and sends it through the established connection.
50 * @author netseeker
51 * @since 0.3.1
52 */
53 final class ConnectionWriter implements Runnable
54 {
55 private static final Logger log = Logger.getLogger(ConnectionReader.class.getName());
56
57 private final ChannelRegistrar _registrar;
58
59 private final ConnectionHeader _senderInfo;
60
61 private ConnectionHeader _receiverInfo;
62
63 /***
64 * @param serverInfo
65 * @param clientInfo
66 * @param channel
67 * @param adapter
68 * @param attachment
69 */
70 public ConnectionWriter(final ChannelRegistrar registrar, final ConnectionHeader senderInfo,
71 ConnectionHeader receiverInfo)
72 {
73 this._registrar = registrar;
74 this._senderInfo = senderInfo;
75 this._receiverInfo = receiverInfo;
76 }
77
78
79
80
81
82
83 public void run()
84 {
85 try
86 {
87 SerializeAdapter adapter = AdapterFactory.createAdapter(this._receiverInfo.getAdapterName());
88
89 if (this._senderInfo.hasNonBlockingReadWrite())
90 {
91 write(adapter);
92 }
93 else
94 {
95 writeBlocked(adapter);
96 }
97
98 if (!(this._senderInfo.isPersistent() && this._receiverInfo.isPersistent()))
99 {
100 log.log(Level.FINEST, "Non-persistent connection detected, closing connection...");
101 IOUtils.closeQuite(this._receiverInfo.getChannel());
102 this._receiverInfo = null;
103 }
104 else
105 {
106 log.log(Level.FINEST,
107 "Persistent connection detected, registering connections for further read events...");
108 this._receiverInfo.releaseWaitingBuffer();
109 this._receiverInfo.releaseAttachment();
110 this._registrar.register(this._receiverInfo, SelectionKey.OP_READ);
111 }
112 }
113 catch (ClosedChannelException cce)
114 {
115 log.log(Level.INFO, "Channel closed by client.", cce);
116 IOUtils.closeQuite(this._receiverInfo.getChannel());
117 this._receiverInfo = null;
118 }
119 catch (IncompleteIOException ioe)
120 {
121 if (this._registrar.isValid())
122 {
123 this._receiverInfo.setWaitingBuffer(ioe.getIOBuffer());
124 this._registrar.register(this._receiverInfo, SelectionKey.OP_WRITE);
125 }
126 }
127 catch (IOException e)
128 {
129 log.log(Level.SEVERE, "!!! IOException while sending data to client !!!", e);
130 IOUtils.closeQuite(this._receiverInfo.getChannel());
131 this._receiverInfo = null;
132 }
133 catch (Exception e)
134 {
135
136 log.log(Level.SEVERE, "!!! Unknown exception while sending data !!!", e);
137 IOUtils.closeQuite(this._receiverInfo.getChannel());
138 this._receiverInfo = null;
139 }
140 }
141
142 /***
143 * @throws IOException
144 */
145 private void write(SerializeAdapter adapter) throws IOException
146 {
147 ByteBufferOutputStream out = null;
148 ByteBuffer dataBuf = null;
149 WritableByteChannel channel = (WritableByteChannel) this._receiverInfo.getChannel();
150
151 try
152 {
153 if (this._receiverInfo.hasAttachment())
154 {
155 out = new ByteBufferOutputStream();
156 serialize(adapter, out);
157 this._receiverInfo.releaseAttachment();
158 dataBuf = out.getBackingBuffer();
159 dataBuf.flip();
160 IOUtils.writeHeader(this._receiverInfo, dataBuf);
161 IOUtils.nonBlockingWrite(channel, dataBuf);
162 }
163 else if (this._receiverInfo.hasWaitingBuffer())
164 {
165 dataBuf = this._receiverInfo.getWaitingBuffer();
166 IOUtils.nonBlockingWrite(channel, dataBuf);
167 }
168
169 else
170 {
171 IOUtils.writeHeader(this._receiverInfo, null);
172 }
173
174 log.log(Level.FINE, "Server response sent.");
175 }
176 finally
177 {
178 IOUtils.closeQuite(out);
179 }
180 }
181
182 /***
183 * @throws IOException
184 */
185 private void writeBlocked(SerializeAdapter adapter) throws IOException
186 {
187 log.log(Level.FINE, "Going to send server response... ");
188 serialize(adapter, Channels.newOutputStream((WritableByteChannel) this._receiverInfo.getChannel()));
189 log.log(Level.FINE, "Server response sent.");
190 }
191
192 /***
193 * @param out
194 * @throws IOException
195 */
196 private void serialize(SerializeAdapter adapter, OutputStream out) throws IOException
197 {
198 boolean compressed = this._senderInfo.hasCompression() && this._receiverInfo.hasCompression();
199 IOUtils.adapterSerialize(adapter, out, this._receiverInfo.getAttachment(), compressed, this._senderInfo
200 .getCompressionLevel());
201 }
202 }