1 /**********************************************************************
2 * ConnectionReader.java
3 * created on 05.03.2005 by netseeker
4 * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/core/ConnectionReader.java,v $
5 * $Date: 2007/11/17 10:57:03 $
6 * $Revision: 1.7 $
7 *
8 * ====================================================================
9 *
10 * Copyright 2005-2006 netseeker aka Michael Manske
11 *
12 * Licensed under the Apache License, Version 2.0 (the "License");
13 * you may not use this file except in compliance with the License.
14 * You may obtain a copy of the License at
15 *
16 * http://www.apache.org/licenses/LICENSE-2.0
17 *
18 * Unless required by applicable law or agreed to in writing, software
19 * distributed under the License is distributed on an "AS IS" BASIS,
20 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21 * See the License for the specific language governing permissions and
22 * limitations under the License.
23 * ====================================================================
24 *
25 * This file is part of the ejoe framework.
26 * For more information on the author, please see
27 * <http://www.manskes.de/>.
28 *
29 *********************************************************************/
30 package de.netseeker.ejoe.core;
31
32 import java.io.EOFException;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.net.SocketTimeoutException;
36 import java.nio.ByteBuffer;
37 import java.nio.channels.Channels;
38 import java.nio.channels.ClosedChannelException;
39 import java.nio.channels.NonReadableChannelException;
40 import java.nio.channels.SelectionKey;
41 import java.nio.channels.SocketChannel;
42 import java.rmi.RemoteException;
43 import java.text.ParseException;
44 import java.util.logging.Level;
45 import java.util.logging.Logger;
46
47 import de.netseeker.ejoe.ConnectionHeader;
48 import de.netseeker.ejoe.EJConstants;
49 import de.netseeker.ejoe.ServerInfo;
50 import de.netseeker.ejoe.adapter.AdapterFactory;
51 import de.netseeker.ejoe.adapter.SerializeAdapter;
52 import de.netseeker.ejoe.cache.ByteBufferAllocator;
53 import de.netseeker.ejoe.handler.ServerHandler;
54 import de.netseeker.ejoe.http.HttpResponse;
55 import de.netseeker.ejoe.io.ByteBufferInputStream;
56 import de.netseeker.ejoe.io.ChannelInputStream;
57 import de.netseeker.ejoe.io.DataChannel;
58 import de.netseeker.ejoe.io.IOUtil;
59 import de.netseeker.ejoe.io.IncompleteIOException;
60
61 /***
62 * ConnectionReader targets three jobs:
63 * <ul>
64 * <li>Read (partial) data from a established client connection</li>
65 * <li>Invoke the server handler to process the received data</li>
66 * <li>Hand over the response to the ConnectionProcessor for further processing (sending to the client)</li>
67 * </ul>
68 *
69 * @author netseeker
70 * @since 0.3.0
71 */
72 public class ConnectionReader implements Runnable
73 {
74 private static final Logger log = Logger.getLogger( ConnectionReader.class.getName() );
75
76 private final ChannelRegistrar _registrar;
77
78 private ConnectionHeader _senderInfo;
79
80 private ServerInfo _receiverInfo;
81
82 /***
83 * Creates a new instance of ConnectionReader
84 *
85 * @param channel a socket channel ready for reading and writing
86 * @param adapter (de)serialize adapter
87 * @param handler the working horse handling the transported input object
88 */
89 public ConnectionReader(final ChannelRegistrar registrar, ServerInfo receiverInfo, ConnectionHeader senderInfo)
90 {
91 this._registrar = registrar;
92 this._receiverInfo = receiverInfo;
93 this._senderInfo = senderInfo;
94 }
95
96
97
98
99
100
101 public void run()
102 {
103 Object request = null;
104 SocketChannel channel = this._senderInfo.getChannel();
105
106 try
107 {
108 if ( !this._senderInfo.isConnected() )
109 {
110 log.log( Level.FINEST, "Handshaking client..." );
111
112 this._senderInfo = DataChannel.getInstance().handshake( this._receiverInfo, channel,
113 EJConstants.EJOE_CONNECTION_TIMEOUT );
114 }
115
116 if ( this._senderInfo != null )
117 {
118 log.log( Level.FINEST, "Remote requested " + this._senderInfo.getAdapterName() );
119
120 SerializeAdapter adapter = AdapterFactory.createAdapter( this._senderInfo.getAdapterName() );
121
122 if ( this._receiverInfo.hasNonBlockingReadWrite() )
123 {
124 log.log( Level.FINEST, "Going to read client request on a non blocking socket..." );
125 request = read( adapter );
126 }
127 else
128 {
129 log.log( Level.FINEST, "Going to read client request on a blocking socket..." );
130 request = readBlocked( adapter );
131 }
132
133 this._senderInfo.releaseAttachment();
134 this._senderInfo.releaseWaitingBuffer();
135
136 log.log( Level.FINE, "Client request read." );
137
138 Object result = handleObject( request );
139
140 if ( this._registrar.isValid() )
141 {
142 this._senderInfo.setAttachment( result );
143 this._registrar.register( this._senderInfo, SelectionKey.OP_WRITE );
144 }
145 }
146 else
147 {
148 log.log( Level.WARNING, "Connection timeout reached while waiting for Handshake complete. "
149 + "Closing connection." );
150 shutdownConnection( channel );
151 }
152 }
153
154 catch ( IncompleteIOException ioe )
155 {
156 this._senderInfo.setWaitingBuffer( ioe.getIOBuffer() );
157 this._registrar.register( this._senderInfo, SelectionKey.OP_READ );
158 }
159 catch ( EOFException eof )
160 {
161 log.log( Level.FINEST, "EOF received while reading client data " + "- closing connection." );
162 shutdownConnection( channel );
163 }
164 catch ( ParseException pe )
165 {
166 log.log( Level.WARNING, "Unparseable connection header detected!", pe );
167 if ( !this._senderInfo.isHttp() )
168 {
169 this._senderInfo.setAttachment( new RemoteException( "Unparseable connection header!", pe ) );
170 }
171 else
172 {
173 this._senderInfo.setAttachment( new RemoteException( "Unparseable connection header!", pe ),
174 HttpResponse.HTTP_BAD_REQUEST );
175 }
176
177 this._registrar.register( this._senderInfo, SelectionKey.OP_WRITE );
178 }
179 catch ( SocketTimeoutException ste )
180 {
181 log.log( Level.FINE, "Timeout occured while waiting for client data!", ste );
182 shutdownConnection( channel );
183 }
184 catch ( RemoteException re )
185 {
186 if ( !this._senderInfo.isHttp() )
187 {
188 this._senderInfo.setAttachment( re );
189 }
190 else
191 {
192 this._senderInfo.setAttachment( re, HttpResponse.HTTP_INTERNAL_SERVER_ERROR );
193 }
194 this._registrar.register( this._senderInfo, SelectionKey.OP_WRITE );
195 }
196
197
198 catch ( NonReadableChannelException e )
199 {
200 log.log( Level.INFO, "Connection probably closed by client." );
201 shutdownConnection( channel );
202 }
203
204
205 catch ( ClosedChannelException cce )
206 {
207 log.log( Level.INFO, "Connection closed by client." );
208 shutdownConnection( channel );
209 }
210 catch ( Throwable e )
211 {
212 if ( !channel.isBlocking() && channel.isConnected() && channel.isOpen() )
213 {
214
215 log.log( Level.WARNING, "!!! Exception while reading client data !!! "
216 + "Probably the client just closed the connection but it could also be a serious failure.", e );
217 }
218 else
219 {
220
221 log.log( Level.INFO, "Connection propably closed by client.", e );
222 }
223
224 shutdownConnection( channel );
225 }
226 }
227
228 private void shutdownConnection( SocketChannel channel )
229 {
230 IOUtil.closeQuiet( channel );
231 this._senderInfo = null;
232 }
233
234 /***
235 * Reads and deserialize a request object from a socket channel in non-blocking mode
236 *
237 * @param adapter the deserialization adapter to use
238 * @return the deserialized request object
239 * @throws IOException
240 * @throws UnsupportedOperationException
241 */
242 private Object read( SerializeAdapter adapter ) throws IOException
243 {
244 ByteBufferInputStream in = null;
245 ByteBuffer dataBuf = null;
246 Object result = null;
247 DataChannel dataChannel = DataChannel.getInstance( this._senderInfo );
248
249 try
250 {
251 if ( !this._senderInfo.hasWaitingBuffer() )
252 {
253 int length = dataChannel.readHeader( this._senderInfo, EJConstants.EJOE_CONNECTION_TIMEOUT );
254
255
256
257 if ( this._senderInfo.hasWaitingBuffer() )
258 {
259 dataBuf = this._senderInfo.getWaitingBuffer();
260 }
261 else
262 {
263 dataBuf = ByteBufferAllocator.allocate( length );
264 }
265
266 log.log( Level.FINE, "Going to read client request with length: " + length );
267 }
268 else
269 {
270 dataBuf = this._senderInfo.getWaitingBuffer();
271 }
272
273 if ( dataBuf.hasRemaining() )
274 {
275 DataChannel.nonBlockingRead( this._senderInfo.getChannel(), dataBuf );
276 }
277
278 dataBuf.flip();
279
280 if ( log.isLoggable( Level.FINE ) )
281 {
282 byte[] tmp = new byte[dataBuf.remaining()];
283 dataBuf.get( tmp );
284 dataBuf.position( 0 );
285 log.log( Level.FINE, "Client request read:\n" + new String( tmp, EJConstants.EJOE_DEFAULT_CHARSET ) );
286 }
287
288 if ( dataBuf.hasRemaining() )
289 {
290 try
291 {
292
293 if ( !this._senderInfo.isDirect() )
294 {
295 dataBuf = dataChannel.decode( dataBuf );
296 in = new ByteBufferInputStream( dataBuf );
297 result = deserialize( adapter, in );
298 }
299
300
301 else
302 {
303
304 ByteBuffer tmp = ByteBufferAllocator.allocate( dataBuf.remaining() );
305 tmp.put( dataBuf );
306 tmp.flip();
307 result = dataChannel.decode( tmp );
308 }
309 }
310 catch ( Throwable t )
311 {
312 throw new RemoteException( "Error while preprocessing request!", t );
313 }
314 finally
315 {
316 this._senderInfo.releaseWaitingBuffer();
317 }
318 }
319 }
320 finally
321 {
322 IOUtil.closeQuiet( in );
323 }
324
325 return result;
326 }
327
328 /***
329 * @return the _receiverInfo
330 */
331 public ServerInfo getReceiverInfo()
332 {
333 return _receiverInfo;
334 }
335
336 /***
337 * @return the _senderInfo
338 */
339 public ConnectionHeader getSenderInfo()
340 {
341 return _senderInfo;
342 }
343
344 /***
345 * @return the _registrar
346 */
347 public ChannelRegistrar getRegistrar()
348 {
349 return _registrar;
350 }
351
352 /***
353 * Reads and deserialize a request object from a socket channel in blocking mode
354 *
355 * @return
356 * @throws IOException
357 * @throws UnsupportedOperationException
358 */
359 private Object readBlocked( SerializeAdapter adapter ) throws Exception
360 {
361
362 if ( !this._senderInfo.isDirect() )
363 {
364 return deserialize( adapter, new ChannelInputStream( Channels
365 .newInputStream( this._senderInfo.getChannel() ) ) );
366 }
367
368
369 else
370 {
371 return IOUtil
372 .readDirect( new ChannelInputStream( Channels.newInputStream( this._senderInfo.getChannel() ) ) );
373 }
374 }
375
376 /***
377 * @param clientInfo
378 * @param in
379 * @return
380 * @throws IOException
381 */
382 protected Object deserialize( SerializeAdapter adapter, InputStream in ) throws Exception
383 {
384 boolean compressed = !this._senderInfo.isHttp() && this._receiverInfo.hasCompression()
385 && this._senderInfo.hasCompression();
386
387 return IOUtil.adapterDeserialize( adapter, in, compressed );
388 }
389
390 /***
391 * @param obj
392 * @return
393 */
394 protected Object handleObject( Object obj ) throws RemoteException
395 {
396 Object result = null;
397 ServerHandler handler = this._receiverInfo.getHandler();
398
399 try
400 {
401 result = handler.handle( obj );
402 }
403 catch ( Throwable e )
404 {
405 log.log( Level.WARNING, "Exception in ServerHandler " + this._receiverInfo.getHandler() + " occured.", e );
406 throw new RemoteException( "Server failed to proceed your request!", e );
407 }
408
409 return result;
410 }
411 }