1 /**********************************************************************
2 * ConnectionWriter.java
3 * created on 05.03.2005 by netseeker
4 * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/core/ConnectionWriter.java,v $
5 * $Date: 2007/03/25 15:03:20 $
6 * $Revision: 1.4 $
7 *
8 * ====================================================================
9 *
10 * Copyright 2005-2006 netseeker aka Michael Manske
11 *
12 * Licensed under the Apache License, Version 2.0 (the "License");
13 * you may not use this file except in compliance with the License.
14 * You may obtain a copy of the License at
15 *
16 * http://www.apache.org/licenses/LICENSE-2.0
17 *
18 * Unless required by applicable law or agreed to in writing, software
19 * distributed under the License is distributed on an "AS IS" BASIS,
20 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21 * See the License for the specific language governing permissions and
22 * limitations under the License.
23 * ====================================================================
24 *
25 * This file is part of the ejoe framework.
26 * For more information on the author, please see
27 * <http://www.manskes.de/>.
28 *
29 *********************************************************************/
30 package de.netseeker.ejoe.core;
31
32 import java.io.IOException;
33 import java.io.OutputStream;
34 import java.net.SocketTimeoutException;
35 import java.nio.ByteBuffer;
36 import java.nio.channels.Channels;
37 import java.nio.channels.ClosedChannelException;
38 import java.nio.channels.SelectionKey;
39 import java.nio.channels.WritableByteChannel;
40 import java.rmi.RemoteException;
41 import java.util.logging.Level;
42 import java.util.logging.Logger;
43
44 import de.netseeker.ejoe.ConnectionHeader;
45 import de.netseeker.ejoe.EJConstants;
46 import de.netseeker.ejoe.ServerInfo;
47 import de.netseeker.ejoe.adapter.AdapterFactory;
48 import de.netseeker.ejoe.adapter.SerializeAdapter;
49 import de.netseeker.ejoe.cache.ByteBufferAllocator;
50 import de.netseeker.ejoe.io.ByteBufferOutputStream;
51 import de.netseeker.ejoe.io.DataChannel;
52 import de.netseeker.ejoe.io.IOUtil;
53 import de.netseeker.ejoe.io.IncompleteIOException;
54
55 /***
56 * ConnectionWriter serializes a server answer and sends it through the established connection.
57 *
58 * @author netseeker
59 * @since 0.3.1
60 */
61 final class ConnectionWriter implements Runnable
62 {
63 private static final Logger log = Logger.getLogger( ConnectionReader.class.getName() );
64
65 private final ChannelRegistrar _registrar;
66
67 private final ConnectionHeader _senderInfo;
68
69 private ConnectionHeader _receiverInfo;
70
71 /***
72 * Creates a new instance of ConnectionWriter
73 *
74 * @param serverInfo
75 * @param clientInfo
76 * @param channel
77 * @param adapter
78 * @param attachment
79 */
80 public ConnectionWriter(final ChannelRegistrar registrar, final ServerInfo senderInfo, ConnectionHeader receiverInfo)
81 {
82 this._senderInfo = senderInfo;
83 this._receiverInfo = receiverInfo;
84 this._registrar = registrar;
85 }
86
87
88
89
90
91
92 public void run()
93 {
94 try
95 {
96 if ( this._senderInfo.hasNonBlockingReadWrite() )
97 {
98 write();
99 }
100 else
101 {
102 writeBlocked();
103 }
104
105 this._receiverInfo.releaseWaitingBuffer();
106 this._receiverInfo.releaseAttachment();
107
108 if ( !this._senderInfo.isPersistent() || !this._receiverInfo.isPersistent() )
109 {
110 log.log( Level.FINEST, "Non-persistent connection detected, closing connection..." );
111 shutdownConnection();
112 }
113 else if ( this._registrar.isValid() )
114 {
115 log.log( Level.FINEST,
116 "Persistent connection detected, registering connections for further read events..." );
117 this._registrar.register( this._receiverInfo, SelectionKey.OP_READ );
118 }
119 }
120 catch ( ClosedChannelException cce )
121 {
122 log.log( Level.INFO, "Channel closed by client.", cce );
123 shutdownConnection();
124 }
125 catch ( IncompleteIOException ioe )
126 {
127 if ( this._registrar.isValid() )
128 {
129 this._receiverInfo.setWaitingBuffer( ioe.getIOBuffer() );
130 this._registrar.register( this._receiverInfo, SelectionKey.OP_WRITE );
131 }
132 }
133 catch ( SocketTimeoutException ste )
134 {
135 log.log( Level.FINE, "Timeout occured while sending data!", ste );
136 shutdownConnection();
137 }
138 catch ( RemoteException re )
139 {
140 this._receiverInfo.releaseWaitingBuffer();
141 this._receiverInfo.setAttachment( re );
142 this._registrar.register( this._receiverInfo, SelectionKey.OP_WRITE );
143 }
144 catch ( IOException e )
145 {
146 log.log( Level.WARNING,
147 "!!! IOException while sending data to client propably the client just closed the connection!!!",
148 e );
149 shutdownConnection();
150 }
151 catch ( Throwable e )
152 {
153
154 log.log( Level.SEVERE, "!!! Unknown exception while sending data !!!", e );
155 shutdownConnection();
156 }
157 }
158
159 private void shutdownConnection()
160 {
161 IOUtil.closeQuiet( this._receiverInfo.getChannel() );
162 this._receiverInfo = null;
163 }
164
165 /***
166 * @throws IOException
167 */
168 private void write() throws IOException
169 {
170 ByteBufferOutputStream out = null;
171 ByteBuffer dataBuf = null;
172 SerializeAdapter adapter = this._receiverInfo.getAdapterName() != null ? AdapterFactory
173 .createAdapter( this._receiverInfo.getAdapterName() ) : null;
174 WritableByteChannel channel = this._receiverInfo.getChannel();
175 DataChannel dataChannel = DataChannel.getInstance( this._receiverInfo );
176
177 try
178 {
179 if ( this._receiverInfo.hasAttachment() )
180 {
181
182 if ( !this._receiverInfo.isDirect() )
183 {
184 out = new ByteBufferOutputStream();
185 serialize( adapter, out );
186 dataBuf = out.getBackingBuffer();
187 }
188
189 else
190 {
191 ByteBuffer tmp = (ByteBuffer) this._receiverInfo.getAttachment();
192 if ( tmp.position() > 0 )
193 {
194 tmp.flip();
195 }
196 dataBuf = tmp.duplicate();
197 ByteBufferAllocator.collect( tmp );
198 }
199
200 if ( dataBuf.position() > 0 )
201 {
202 dataBuf.flip();
203 }
204
205 this._receiverInfo.releaseAttachment();
206
207 dataChannel.writeHeader( this._receiverInfo, dataBuf, EJConstants.EJOE_CONNECTION_TIMEOUT );
208
209 if ( log.isLoggable( Level.FINE ) )
210 {
211 byte[] tmp = new byte[dataBuf.remaining()];
212 dataBuf.get( tmp );
213 dataBuf.position( 0 );
214 log.log( Level.FINE, "Going to send server response:\n"
215 + new String( tmp, EJConstants.EJOE_DEFAULT_CHARSET ) );
216 }
217
218 dataChannel.nonBlockingWrite( channel, dataBuf );
219 }
220 else if ( this._receiverInfo.hasWaitingBuffer() )
221 {
222 dataBuf = this._receiverInfo.getWaitingBuffer();
223 dataChannel.nonBlockingWrite( channel, dataBuf );
224 }
225
226 else
227 {
228 dataChannel.writeHeader( this._receiverInfo, null, EJConstants.EJOE_CONNECTION_TIMEOUT );
229 }
230
231 log.log( Level.FINE, "Server response sent." );
232 }
233 finally
234 {
235 IOUtil.closeQuiet( out );
236 }
237 }
238
239 /***
240 * @throws IOException
241 */
242 private void writeBlocked() throws IOException
243 {
244 SerializeAdapter adapter = this._receiverInfo.getAdapterName() != null ? AdapterFactory
245 .createAdapter( this._receiverInfo.getAdapterName() ) : null;
246
247 log.log( Level.FINE, "Going to send server response... " );
248 if ( !this._receiverInfo.isDirect() )
249 {
250 serialize( adapter, Channels.newOutputStream( this._receiverInfo.getChannel() ) );
251 }
252
253 else
254 {
255 ByteBuffer tmp = (ByteBuffer) this._receiverInfo.getAttachment();
256 if ( tmp.position() > 0 )
257 {
258 tmp.flip();
259 }
260 IOUtil.writeDirect( Channels.newOutputStream( this._receiverInfo.getChannel() ), tmp );
261 }
262
263 log.log( Level.FINE, "Server response sent." );
264 }
265
266 /***
267 * @param out
268 * @throws IOException
269 */
270 private void serialize( SerializeAdapter adapter, OutputStream out ) throws IOException
271 {
272 boolean compressed = this._senderInfo.hasCompression() && this._receiverInfo.hasCompression();
273 try
274 {
275 IOUtil.adapterSerialize( adapter, out, this._receiverInfo.getAttachment(), compressed, this._senderInfo
276 .getCompressionLevel() );
277 }
278 catch ( IOException ioe )
279 {
280 throw ioe;
281 }
282 catch ( Throwable t )
283 {
284 throw new RemoteException( "The server encounteres an error while serializing the server response!", t );
285 }
286 }
287 }