View Javadoc

1   /**********************************************************************
2    * ConnectionReader.java
3    * created on 05.03.2005 by netseeker
4    * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/core/ConnectionReader.java,v $
5    * $Date: 2007/11/17 10:57:03 $
6    * $Revision: 1.7 $
7    *
8    * ====================================================================
9    *
10   *  Copyright 2005-2006 netseeker aka Michael Manske
11   *
12   *  Licensed under the Apache License, Version 2.0 (the "License");
13   *  you may not use this file except in compliance with the License.
14   *  You may obtain a copy of the License at
15   *
16   *      http://www.apache.org/licenses/LICENSE-2.0
17   *
18   *  Unless required by applicable law or agreed to in writing, software
19   *  distributed under the License is distributed on an "AS IS" BASIS,
20   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21   *  See the License for the specific language governing permissions and
22   *  limitations under the License.
23   * ====================================================================
24   *
25   * This file is part of the ejoe framework.
26   * For more information on the author, please see
27   * <http://www.manskes.de/>.
28   *
29   *********************************************************************/
30  package de.netseeker.ejoe.core;
31  
32  import java.io.EOFException;
33  import java.io.IOException;
34  import java.io.InputStream;
35  import java.net.SocketTimeoutException;
36  import java.nio.ByteBuffer;
37  import java.nio.channels.Channels;
38  import java.nio.channels.ClosedChannelException;
39  import java.nio.channels.NonReadableChannelException;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.SocketChannel;
42  import java.rmi.RemoteException;
43  import java.text.ParseException;
44  import java.util.logging.Level;
45  import java.util.logging.Logger;
46  
47  import de.netseeker.ejoe.ConnectionHeader;
48  import de.netseeker.ejoe.EJConstants;
49  import de.netseeker.ejoe.ServerInfo;
50  import de.netseeker.ejoe.adapter.AdapterFactory;
51  import de.netseeker.ejoe.adapter.SerializeAdapter;
52  import de.netseeker.ejoe.cache.ByteBufferAllocator;
53  import de.netseeker.ejoe.handler.ServerHandler;
54  import de.netseeker.ejoe.http.HttpResponse;
55  import de.netseeker.ejoe.io.ByteBufferInputStream;
56  import de.netseeker.ejoe.io.ChannelInputStream;
57  import de.netseeker.ejoe.io.DataChannel;
58  import de.netseeker.ejoe.io.IOUtil;
59  import de.netseeker.ejoe.io.IncompleteIOException;
60  
61  /***
62   * ConnectionReader targets three jobs:
63   * <ul>
64   * <li>Read (partial) data from a established client connection</li>
65   * <li>Invoke the server handler to process the received data</li>
66   * <li>Hand over the response to the ConnectionProcessor for further processing (sending to the client)</li>
67   * </ul>
68   * 
69   * @author netseeker
70   * @since 0.3.0
71   */
72  public class ConnectionReader implements Runnable
73  {
74      private static final Logger    log = Logger.getLogger( ConnectionReader.class.getName() );
75  
76      private final ChannelRegistrar _registrar;
77  
78      private ConnectionHeader       _senderInfo;
79  
80      private ServerInfo             _receiverInfo;
81  
82      /***
83       * Creates a new instance of ConnectionReader
84       * 
85       * @param channel a socket channel ready for reading and writing
86       * @param adapter (de)serialize adapter
87       * @param handler the working horse handling the transported input object
88       */
89      public ConnectionReader(final ChannelRegistrar registrar, ServerInfo receiverInfo, ConnectionHeader senderInfo)
90      {
91          this._registrar = registrar;
92          this._receiverInfo = receiverInfo;
93          this._senderInfo = senderInfo;
94      }
95  
96      /*
97       * (non-Javadoc)
98       * 
99       * @see java.lang.Runnable#run()
100      */
101     public void run()
102     {
103         Object request = null;
104         SocketChannel channel = this._senderInfo.getChannel();
105 
106         try
107         {
108             if ( !this._senderInfo.isConnected() )
109             {
110                 log.log( Level.FINEST, "Handshaking client..." );
111                 // handshake the client, get infos about compression...
112                 this._senderInfo = DataChannel.getInstance().handshake( this._receiverInfo, channel,
113                                                                         EJConstants.EJOE_CONNECTION_TIMEOUT );
114             }
115 
116             if ( this._senderInfo != null )
117             {
118                 log.log( Level.FINEST, "Remote requested " + this._senderInfo.getAdapterName() );
119 
120                 SerializeAdapter adapter = AdapterFactory.createAdapter( this._senderInfo.getAdapterName() );
121 
122                 if ( this._receiverInfo.hasNonBlockingReadWrite() )
123                 {
124                     log.log( Level.FINEST, "Going to read client request on a non blocking socket..." );
125                     request = read( adapter );
126                 }
127                 else
128                 {
129                     log.log( Level.FINEST, "Going to read client request on a blocking socket..." );
130                     request = readBlocked( adapter );
131                 }
132 
133                 this._senderInfo.releaseAttachment();
134                 this._senderInfo.releaseWaitingBuffer();
135 
136                 log.log( Level.FINE, "Client request read." );
137 
138                 Object result = handleObject( request );
139 
140                 if ( this._registrar.isValid() )
141                 {
142                     this._senderInfo.setAttachment( result );
143                     this._registrar.register( this._senderInfo, SelectionKey.OP_WRITE );
144                 }
145             }
146             else
147             {
148                 log.log( Level.WARNING, "Connection timeout reached while waiting for Handshake complete. "
149                         + "Closing connection." );
150                 shutdownConnection( channel );
151             }
152         }
153         // partial read detected, registering for read again
154         catch ( IncompleteIOException ioe )
155         {
156             this._senderInfo.setWaitingBuffer( ioe.getIOBuffer() );
157             this._registrar.register( this._senderInfo, SelectionKey.OP_READ );
158         }
159         catch ( EOFException eof )
160         {
161             log.log( Level.FINEST, "EOF received while reading client data " + "- closing connection." );
162             shutdownConnection( channel );
163         }
164         catch ( ParseException pe )
165         {
166             log.log( Level.WARNING, "Unparseable connection header detected!", pe );
167             if ( !this._senderInfo.isHttp() )
168             {
169                 this._senderInfo.setAttachment( new RemoteException( "Unparseable connection header!", pe ) );
170             }
171             else
172             {
173                 this._senderInfo.setAttachment( new RemoteException( "Unparseable connection header!", pe ),
174                                                 HttpResponse.HTTP_BAD_REQUEST );
175             }
176 
177             this._registrar.register( this._senderInfo, SelectionKey.OP_WRITE );
178         }
179         catch ( SocketTimeoutException ste )
180         {
181             log.log( Level.FINE, "Timeout occured while waiting for client data!", ste );
182             shutdownConnection( channel );
183         }
184         catch ( RemoteException re )
185         {
186             if ( !this._senderInfo.isHttp() )
187             {
188                 this._senderInfo.setAttachment( re );
189             }
190             else
191             {
192                 this._senderInfo.setAttachment( re, HttpResponse.HTTP_INTERNAL_SERVER_ERROR );
193             }
194             this._registrar.register( this._senderInfo, SelectionKey.OP_WRITE );
195         }
196         // the client did something strange with the channel, probably the
197         // client is just too slow for
198         catch ( NonReadableChannelException e )
199         {
200             log.log( Level.INFO, "Connection probably closed by client." );
201             shutdownConnection( channel );
202         }
203         // the client did close the connection, or the connection was
204         // disconnected
205         catch ( ClosedChannelException cce )
206         {
207             log.log( Level.INFO, "Connection closed by client." );
208             shutdownConnection( channel );
209         }
210         catch ( Throwable e )
211         {
212             if ( !channel.isBlocking() && channel.isConnected() && channel.isOpen() )
213             {
214                 // something goes completely wrong!
215                 log.log( Level.WARNING, "!!! Exception while reading client data !!! "
216                         + "Probably the client just closed the connection but it could also be a serious failure.", e );
217             }
218             else
219             {
220                 // seems like a follow-up exception due to disconnect
221                 log.log( Level.INFO, "Connection propably closed by client.", e );
222             }
223 
224             shutdownConnection( channel );
225         }
226     }
227 
228     private void shutdownConnection( SocketChannel channel )
229     {
230         IOUtil.closeQuiet( channel );
231         this._senderInfo = null;
232     }
233 
234     /***
235      * Reads and deserialize a request object from a socket channel in non-blocking mode
236      * 
237      * @param adapter the deserialization adapter to use
238      * @return the deserialized request object
239      * @throws IOException
240      * @throws UnsupportedOperationException
241      */
242     private Object read( SerializeAdapter adapter ) throws IOException
243     {
244         ByteBufferInputStream in = null;
245         ByteBuffer dataBuf = null;
246         Object result = null;
247         DataChannel dataChannel = DataChannel.getInstance( this._senderInfo );
248 
249         try
250         {
251             if ( !this._senderInfo.hasWaitingBuffer() )
252             {
253                 int length = dataChannel.readHeader( this._senderInfo, EJConstants.EJOE_CONNECTION_TIMEOUT );
254 
255                 // maybe the DataChannel signals that it has already read
256                 // partial data
257                 if ( this._senderInfo.hasWaitingBuffer() )
258                 {
259                     dataBuf = this._senderInfo.getWaitingBuffer();
260                 }
261                 else
262                 {
263                     dataBuf = ByteBufferAllocator.allocate( length );
264                 }
265 
266                 log.log( Level.FINE, "Going to read client request with length: " + length );
267             }
268             else
269             {
270                 dataBuf = this._senderInfo.getWaitingBuffer();
271             }
272 
273             if ( dataBuf.hasRemaining() )
274             {
275                 DataChannel.nonBlockingRead( this._senderInfo.getChannel(), dataBuf );
276             }
277 
278             dataBuf.flip();
279 
280             if ( log.isLoggable( Level.FINE ) )
281             {
282                 byte[] tmp = new byte[dataBuf.remaining()];
283                 dataBuf.get( tmp );
284                 dataBuf.position( 0 );
285                 log.log( Level.FINE, "Client request read:\n" + new String( tmp, EJConstants.EJOE_DEFAULT_CHARSET ) );
286             }
287 
288             if ( dataBuf.hasRemaining() )
289             {
290                 try
291                 {
292                     // usual way: deserialize using a adapter
293                     if ( !this._senderInfo.isDirect() )
294                     {
295                         dataBuf = dataChannel.decode( dataBuf );
296                         in = new ByteBufferInputStream( dataBuf );
297                         result = deserialize( adapter, in );
298                     }
299                     // direct mode: don't deserialize, just copy and return the read
300                     // ByteBuffer
301                     else
302                     {
303                         // copy the bytebuffer
304                         ByteBuffer tmp = ByteBufferAllocator.allocate( dataBuf.remaining() );
305                         tmp.put( dataBuf );
306                         tmp.flip();
307                         result = dataChannel.decode( tmp );
308                     }
309                 }
310                 catch ( Throwable t )
311                 {
312                     throw new RemoteException( "Error while preprocessing request!", t );
313                 }
314                 finally
315                 {
316                     this._senderInfo.releaseWaitingBuffer();
317                 }
318             }
319         }
320         finally
321         {
322             IOUtil.closeQuiet( in );
323         }
324 
325         return result;
326     }
327 
328     /***
329      * @return the _receiverInfo
330      */
331     public ServerInfo getReceiverInfo()
332     {
333         return _receiverInfo;
334     }
335 
336     /***
337      * @return the _senderInfo
338      */
339     public ConnectionHeader getSenderInfo()
340     {
341         return _senderInfo;
342     }
343 
344     /***
345      * @return the _registrar
346      */
347     public ChannelRegistrar getRegistrar()
348     {
349         return _registrar;
350     }
351 
352     /***
353      * Reads and deserialize a request object from a socket channel in blocking mode
354      * 
355      * @return
356      * @throws IOException
357      * @throws UnsupportedOperationException
358      */
359     private Object readBlocked( SerializeAdapter adapter ) throws Exception
360     {
361         // usual way: deserialize using a adapter
362         if ( !this._senderInfo.isDirect() )
363         {
364             return deserialize( adapter, new ChannelInputStream( Channels
365                     .newInputStream( this._senderInfo.getChannel() ) ) );
366         }
367         // direct mode: don't deserialize, just copy and return the read
368         // ByteBuffer
369         else
370         {
371             return IOUtil
372                     .readDirect( new ChannelInputStream( Channels.newInputStream( this._senderInfo.getChannel() ) ) );
373         }
374     }
375 
376     /***
377      * @param clientInfo
378      * @param in
379      * @return
380      * @throws IOException
381      */
382     protected Object deserialize( SerializeAdapter adapter, InputStream in ) throws Exception
383     {
384         boolean compressed = !this._senderInfo.isHttp() && this._receiverInfo.hasCompression()
385                 && this._senderInfo.hasCompression();
386 
387         return IOUtil.adapterDeserialize( adapter, in, compressed );
388     }
389 
390     /***
391      * @param obj
392      * @return
393      */
394     protected Object handleObject( Object obj ) throws RemoteException
395     {
396         Object result = null;
397         ServerHandler handler = this._receiverInfo.getHandler();
398 
399         try
400         {
401             result = handler.handle( obj );
402         }
403         catch ( Throwable e )
404         {
405             log.log( Level.WARNING, "Exception in ServerHandler " + this._receiverInfo.getHandler() + " occured.", e );
406             throw new RemoteException( "Server failed to proceed your request!", e );
407         }
408 
409         return result;
410     }
411 }