View Javadoc

1   /**********************************************************************
2    * EJClient.java
3    * created on 08.08.2004 by netseeker
4    * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/EJClient.java,v $
5    * $Date: 2007/11/17 10:59:41 $
6    * $Revision: 1.100 $
7    *
8    * ====================================================================
9    *
10   *  Copyright 2005-2006 netseeker aka Michael Manske
11   *
12   *  Licensed under the Apache License, Version 2.0 (the "License");
13   *  you may not use this file except in compliance with the License.
14   *  You may obtain a copy of the License at
15   *
16   *      http://www.apache.org/licenses/LICENSE-2.0
17   *
18   *  Unless required by applicable law or agreed to in writing, software
19   *  distributed under the License is distributed on an "AS IS" BASIS,
20   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21   *  See the License for the specific language governing permissions and
22   *  limitations under the License.
23   * ====================================================================
24   *
25   * This file is part of the ejoe framework.
26   * For more information on the author, please see
27   * <http://www.manskes.de/>.
28   *
29   *********************************************************************/
30  
31  package de.netseeker.ejoe;
32  
33  import java.io.FileInputStream;
34  import java.io.IOException;
35  import java.io.InputStream;
36  import java.io.OutputStream;
37  import java.io.Serializable;
38  import java.net.InetSocketAddress;
39  import java.net.Socket;
40  import java.nio.ByteBuffer;
41  import java.nio.channels.Channels;
42  import java.nio.channels.SelectionKey;
43  import java.nio.channels.Selector;
44  import java.nio.channels.SocketChannel;
45  import java.rmi.RemoteException;
46  import java.text.ParseException;
47  import java.util.Iterator;
48  import java.util.Properties;
49  import java.util.Random;
50  import java.util.logging.Level;
51  import java.util.logging.Logger;
52  
53  import de.netseeker.ejoe.adapter.AdapterFactory;
54  import de.netseeker.ejoe.adapter.SerializeAdapter;
55  import de.netseeker.ejoe.cache.ByteBufferAllocator;
56  import de.netseeker.ejoe.core.InJvmProcessor;
57  import de.netseeker.ejoe.core.InJvmProcessorFactory;
58  import de.netseeker.ejoe.io.ByteBufferInputStream;
59  import de.netseeker.ejoe.io.ByteBufferOutputStream;
60  import de.netseeker.ejoe.io.ChannelInputStream;
61  import de.netseeker.ejoe.io.DataChannel;
62  import de.netseeker.ejoe.io.IOUtil;
63  import de.netseeker.ejoe.io.IncompleteIOException;
64  
65  /***
66   * This is the client component of EJOE. You have to use this component to send and retrieve data to/from a EJOE server.
67   * <p>
68   * Basic usage:
69   * 
70   * <pre>
71   *                             EJClient client = new EJClient(&quot;127.0.0.1&quot;, 12577); //you could also use EJConstants.EJOE_PORT
72   *                             Object response = client.execute(&quot;Hello World&quot;);
73   *                             ...
74   * </pre>
75   * 
76   * </p>
77   * <p>
78   * Usage with persistent connections:
79   * 
80   * <pre>
81   *                             EJClient client = new EJClient(&quot;127.0.0.1&quot;, 12577); //you could also use EJConstants.EJOE_PORT
82   *                             client.enablePersistentConnection(true); //request a persistent connection to the server
83   *                             Object response = client.execute(&quot;Hello World&quot;);
84   *                             ...
85   *                             reponse = client.execute(&quot;Bye World&quot;);
86   *                             //close the connection
87   *                             client.close();
88   *                             ...
89   * </pre>
90   * 
91   * </p>
92   * <p>
93   * Usage with In-JVM EJServer:
94   * 
95   * <pre>
96   *                             EJClient client = new EJClient(&quot;127.0.0.1&quot;, 12577); //you could also use EJConstants.EJOE_PORT
97   *                             client.setInJvm(true); //enable In-JVM mode, no socket connections will be used
98   *                             Object response = client.execute(&quot;Hello World&quot;);
99   *                             ...
100  *                             reponse = client.execute(&quot;Bye World&quot;);
101  *                             ...
102  * </pre>
103  * 
104  * </p>
105  * 
106  * @author netseeker aka Michael Manske
107  * @since 0.3.0
108  */
109 public class EJClient implements Serializable
110 {
111     private static final long             serialVersionUID   = 1L;
112 
113     private transient static final Logger log                = Logger.getLogger( EJClient.class.getName() );
114 
115     private SerializeAdapter              _adapter;
116 
117     private String                        _host              = "127.0.0.1";
118 
119     private int                           _port              = EJConstants.EJOE_PORT;
120 
121     private int                           _connectionTimeout = EJConstants.EJOE_CONNECTION_TIMEOUT;
122 
123     private boolean                       _inJVM             = false;
124 
125     private final ConnectionHeader        _clientInfo        = new ConnectionHeader( true );
126 
127     private transient ConnectionHeader    _serverInfo;
128 
129     private transient SocketChannel       _channel;
130 
131     private transient Selector            _selector;
132 
133     private Object                        _mutex             = new Object();
134 
135     private static Random                 _idGenerator       = new Random();
136 
137     /***
138      * Creates an instance of the EJOE client pre-configured with settings from the global ejoe.properties file. You
139      * MUST provide such an property file on the classpath to use this constructor.
140      */
141     public EJClient()
142     {
143         Properties props = new Properties();
144         try
145         {
146             props.load( EJClient.class.getResourceAsStream( "/ejoe.properties" ) );
147             loadProperties( props );
148         }
149         catch ( IOException e )
150         {
151             log.log( Level.SEVERE, "ejoe.properties could not be read! "
152                     + "Make sure you have placed a valid properties file in your classpath.", e );
153             throw new RuntimeException( e );
154         }
155     }
156 
157     /***
158      * Creates an instance of the EJOE client pre-configured with settings from a global properties file.
159      * 
160      * @param pathToConfigFile path to the properties file
161      */
162     public EJClient(String pathToConfigFile)
163     {
164         Properties props = new Properties();
165         FileInputStream fis = null;
166         try
167         {
168             fis = new FileInputStream( pathToConfigFile );
169             props.load( fis );
170             loadProperties( props );
171         }
172         catch ( IOException e )
173         {
174             log.log( Level.SEVERE, "ejoe.properties could not be read! "
175                     + "Make sure you have placed a valid properties file in your classpath.", e );
176             throw new RuntimeException( e );
177         }
178         finally
179         {
180             IOUtil.closeQuiet( fis );
181         }
182     }
183 
184     /***
185      * Creates an instance of the EJOE client pre-configured with settings from the given properties store.
186      * 
187      * @param properties properties store containing EJClient settings
188      */
189     public EJClient(Properties properties)
190     {
191         loadProperties( properties );
192     }
193 
194     /***
195      * Creates an instance of the EJOE client preconfigured to use an instance of
196      * de.netseeker.ejoe.adapter.ObjectStreamAdapter for (de)serializing.
197      * 
198      * @param host address (dns name or ip address) of the EJOE server
199      * @param port port which the EJOE server listens to
200      */
201     public EJClient(String host, int port)
202     {
203         this._host = host;
204         this._port = port;
205         this._clientInfo.setHost( _host + ':' + _port );
206         this._adapter = AdapterFactory.createAdapter( EJConstants.EJOE_DEFAULT_ADAPTER.getName() );
207         this._clientInfo.setAdapterName( EJConstants.EJOE_DEFAULT_ADAPTER.getName() );
208     }
209 
210     /***
211      * Creates an instance of the EJOE client.
212      * 
213      * @param host address (dns name or ip address) of the EJOE server
214      * @param port port which the EJOE server listens to
215      * @param adapter the adapter used for (de)serializing input paramter objects for the server and the return values
216      */
217     public EJClient(String host, int port, final SerializeAdapter adapter)
218     {
219         this._host = host;
220         this._port = port;
221         this._clientInfo.setHost( _host + ':' + _port );
222         this._adapter = adapter;
223         this._clientInfo.setAdapterName( adapter.getClass().getName() );
224     }
225 
226     /***
227      * Creates an instance of the EJOE client.
228      * 
229      * @param host address (dns name or ip address) of the EJOE server
230      * @param port port which the EJOE server listens to
231      * @param adapter the adapter used for (de)serializing input paramter objects for the server and the return values
232      * @param isPersistent whether EJClient should use a persistent connection to the server or not
233      * @param isHttp whether EJClient should wrap requests in HTTP headers or not (lets EJClient socket data look like a
234      *            HTTP 1.1 browser communication)
235      */
236     public EJClient(String host, int port, final SerializeAdapter adapter, boolean isPersistent, boolean isHttp,
237                     boolean useCompression)
238     {
239         this( host, port, adapter );
240         enablePersistentConnection( isPersistent );
241         enableCompression( useCompression );
242         enableHttpPackaging( isHttp );
243     }
244 
245     /***
246      * Sets the connection timeout used when waiting for server responses. A value of zero (the default) blocks
247      * indefinitely.
248      * 
249      * @param timeout the new timeout in milliseconds
250      */
251     public synchronized void setConnectionTimeout( int timeout )
252     {
253         this._connectionTimeout = timeout;
254     }
255 
256     /***
257      * Tells this client to use compression (if supported by the server) or not.
258      * 
259      * @param enable
260      */
261     public synchronized void enableCompression( boolean enable )
262     {
263         this._clientInfo.setCompression( enable );
264     }
265 
266     /***
267      * Tells this client to use compression (if supported by the server) with the given compression level.
268      * 
269      * @param compressionLevel the level of compression to use, must be in range of 0-9
270      */
271     public synchronized void enableCompression( int compressionLevel )
272     {
273         this._clientInfo.setCompression( true );
274         this._clientInfo.setCompressionLevel( compressionLevel );
275     }
276 
277     /***
278      * Enables/disables usage of a persistent connection. If persistent connection is disabled a new connection will be
279      * used for each request.
280      * 
281      * @param enable
282      */
283     public synchronized void enablePersistentConnection( boolean enable )
284     {
285         this._clientInfo.setPersistent( enable );
286     }
287 
288     /***
289      * Enables/disables usage of a persistent connection. If persistent connection is disabled a new connection will be
290      * used for each request.
291      * 
292      * @param enable
293      */
294     public synchronized void enableHttpPackaging( boolean enable )
295     {
296         this._clientInfo.setHttp( enable );
297     }
298 
299     /***
300      * @return the _inJVM
301      */
302     public boolean isInJVM()
303     {
304         return _inJVM;
305     }
306 
307     /***
308      * @param injvm the _inJVM to set
309      */
310     public synchronized void setInJVM( boolean injvm )
311     {
312         enableHttpPackaging( false );
313         _inJVM = injvm;
314     }
315 
316     /***
317      * Controls the used Adapter Strategy:
318      * <ul>
319      * <li>ADAPTER_STRATEGY_DEFAULT: both, client and server, will serialize and deserialize objects. The client will
320      * return a deserialized object</li>
321      * <li>ADAPTER_STRATEGY_DIRECT: both, client and server, will NOT serialize and deserialize objects. The client
322      * will expect ByteBuffers as arguments and the ServerHandler will get ByteBuffers too. The client will return
323      * ByteBuffers to the caller.</li>
324      * <li>ADAPTER_STRATEGY_MIXED: both, client and server, will exchange serialized objects. The ServerHandler will
325      * get deserialized objects too handle. The client will return a ByteBuffer instead of derserialized objects to the
326      * caller.</li>
327      * </ul>
328      * 
329      * @param adapterStrategy
330      */
331     public synchronized void setAdapterStrategy( int adapterStrategy )
332     {
333         switch ( adapterStrategy )
334         {
335             case EJConstants.ADAPTER_STRATEGY_DEFAULT:
336                 this._clientInfo.setIsDirect( false );
337                 this._clientInfo.setIsMixed( false );
338                 break;
339             case EJConstants.ADAPTER_STRATEGY_DIRECT:
340                 this._clientInfo.setIsDirect( true );
341                 this._clientInfo.setIsMixed( false );
342                 break;
343             case EJConstants.ADAPTER_STRATEGY_MIXED:
344                 this._clientInfo.setIsDirect( false );
345                 this._clientInfo.setIsMixed( true );
346                 break;
347             default:
348                 throw new IllegalArgumentException( "Unknown Adapter Strategy: " + adapterStrategy );
349         }
350     }
351 
352     /***
353      * Enables remote classloading on the default remote port.
354      */
355     public synchronized void enableRemoteClassloading()
356     {
357         if ( Thread.currentThread().getContextClassLoader() instanceof EJClassLoader )
358             throw new IllegalStateException( "Remote classloading already enabled!" );
359 
360         initClassLoader();
361     }
362 
363     /***
364      * Getter method to get direct access to the underlying ConnectionHeader (as required by the WSIF port
365      * implementation)
366      * 
367      * @return the used client configuration represented as ConnectionHeader
368      */
369     public ConnectionHeader getConnectionHeader()
370     {
371         return _clientInfo;
372     }
373 
374     /***
375      * Main entry point for client tier implementations. Handles all remote server calls... This method is threadsafe,
376      * ensuring that only one request is processed at a time.
377      * 
378      * @param obj input objects for the EJOE Server
379      * @return the object(s) returned by the EJOE server
380      */
381     public Object execute( final Object obj ) throws IOException
382     {
383         synchronized ( _mutex )
384         {
385             Object result = null;
386 
387             // ensure that only one request is in process at one time without
388             // blocking the whole client
389             ConnectionHeader clientInfo = this._clientInfo.copy();
390 
391             if ( _inJVM )
392             {
393                 InJvmProcessor processor = InJvmProcessorFactory.createProcessor( clientInfo );
394 
395                 try
396                 {
397                     result = processor.process( obj );
398                 }
399                 catch ( Exception e )
400                 {
401                     result = e;
402                 }
403             }
404             else
405             {
406                 SelectionKey selKey = null;
407 
408                 boolean error = false;
409 
410                 try
411                 {
412                     if ( this._channel == null || !this._channel.isOpen() )
413                     {
414                         log.log( Level.FINEST, "opening new connection" );
415                         openSocketChannel();
416                         _channel.register( this._selector, SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE );
417                     }
418                     else
419                     {
420                         log.log( Level.FINEST, "reusing persistent connection" );
421                         selKey = _channel.keyFor( this._selector );
422                         // register this channel again
423                         selKey = _channel.register( this._selector, 0 );
424                         selKey.interestOps( SelectionKey.OP_WRITE );
425                     }
426 
427                     Iterator it = null;
428 
429                     while ( this._selector.select( _connectionTimeout ) > 0 )
430                     {
431                         it = this._selector.selectedKeys().iterator();
432 
433                         while ( it.hasNext() )
434                         {
435                             selKey = (SelectionKey) it.next();
436                             it.remove();
437 
438                             if ( !selKey.isValid() )
439                             {
440                                 continue;
441                             }
442 
443                             try
444                             {
445                                 if ( selKey.isConnectable() && _channel.isConnectionPending() )
446                                 {
447                                     if ( !_channel.finishConnect() )
448                                     {
449                                         continue;
450                                     }
451 
452                                     clientInfo.setChannel( _channel );
453 
454                                     if ( log.isLoggable( Level.INFO ) )
455                                     {
456                                         log.log( Level.INFO, "Connection established to "
457                                                 + _channel.socket().getRemoteSocketAddress() );
458                                     }
459 
460                                     _channel.register( this._selector, SelectionKey.OP_WRITE );
461                                 }
462                                 if ( selKey.isWritable() )
463                                 {
464                                     if ( _serverInfo == null )
465                                     {
466                                         log.log( Level.FINEST, "Handshaking server..." );
467                                         _serverInfo = DataChannel.getInstance().handshake( clientInfo, _channel,
468                                                                                            _connectionTimeout );
469                                         if ( _serverInfo == null )
470                                         {
471                                             throw new IOException( "Connection timeout (" + _connectionTimeout
472                                                     + "ms) reached while waiting for handshake completing." );
473                                         }
474                                         else if ( clientInfo.isHttp() && !_serverInfo.isHttp() )
475                                         {
476                                             throw new UnsupportedOperationException(
477                                                                                      "The server does not permit usage of HTTP packaging!" );
478                                         }
479                                     }
480 
481                                     if ( _serverInfo.hasNonBlockingReadWrite() )
482                                     {
483                                         send( _serverInfo, clientInfo, selKey, obj );
484                                         _channel.register( this._selector, SelectionKey.OP_READ );
485                                     }
486                                     else
487                                     {
488                                         selKey.cancel();
489                                         _channel.configureBlocking( true );
490                                         result = processBlocked( clientInfo, obj );
491 
492                                         if ( result != null && (result instanceof Throwable) )
493                                         {
494                                             if ( !(result instanceof RemoteException) )
495                                             {
496                                                 throw new RemoteException(
497                                                                            "The server did return an Exception while handling your request!",
498                                                                            (Throwable) result );
499                                             }
500                                             else
501                                             {
502                                                 throw (RemoteException) result;
503                                             }
504                                         }
505 
506                                         return result;
507                                     }
508                                 }
509                                 else if ( selKey.isReadable() )
510                                 {
511                                     result = receive( clientInfo );
512                                     selKey.cancel();
513 
514                                     if ( result != null && (result instanceof Throwable) )
515                                     {
516                                         if ( !(result instanceof RemoteException) )
517                                         {
518                                             throw new RemoteException(
519                                                                        "The server did return an Exception while handling your request!",
520                                                                        (Throwable) result );
521                                         }
522                                         else
523                                         {
524                                             throw (RemoteException) result;
525                                         }
526                                     }
527 
528                                     return result;
529                                 }
530                             }
531                             catch ( IncompleteIOException ioe )
532                             {
533                                 // selKey.cancel();
534                                 if ( ioe.getIOBuffer() != null )
535                                 {
536                                     clientInfo.setWaitingBuffer( ioe.getIOBuffer() );
537                                     _channel.register( this._selector, ioe.getSelectionInterest() );
538                                 }
539                                 else
540                                 {
541                                     // rethrow origin exception to inform the caller
542                                     // without the hassle of nested exceptions
543                                     throw ioe;
544                                 }
545                             }
546                             catch ( ParseException e )
547                             {
548                                 error = true;
549                                 throw new IOException(
550                                                        "Connection closed due to unparseable connection header! Exact message was: "
551                                                                + e.getMessage() );
552                             }
553                             catch ( IOException e )
554                             {
555                                 if ( !(e instanceof RemoteException) )
556                                 {
557                                     error = true;
558                                 }
559                                 // rethrow origin exception to inform the caller
560                                 // without the hassle of nested exceptions
561                                 throw e;
562                             }
563                         }
564                     }
565                 }
566                 finally
567                 {
568                     selKey.cancel();
569                     // beeing a little bit paranoid is sometimes better
570                     clientInfo.releaseAttachment();
571                     clientInfo.releaseWaitingBuffer();
572                     if ( _serverInfo != null )
573                     {
574                         _serverInfo.releaseAttachment();
575                         _serverInfo.releaseWaitingBuffer();
576                     }
577 
578                     if ( error )
579                     {
580                         close();
581                     }
582                     else if ( !clientInfo.isPersistent() || _serverInfo == null || !_serverInfo.isPersistent() )
583                     {
584                         close();
585                     }
586                     else
587                     {
588                         // clean out cancelled keys
589                         this._selector.selectNow();
590                         if ( _channel.isBlocking() ) _channel.configureBlocking( false );
591                     }
592                 }
593             }
594 
595             if ( result != null && result instanceof Throwable )
596             {
597                 throw new RemoteException( "The server did return an Exception while handling your request!",
598                                            (Throwable) result );
599             }
600 
601             return result;
602         }
603     }
604 
605     /***
606      * Asynchrounous entry point for executing server invocations. This method works asynchrounous in that way, as it
607      * starts each invocation in a new thread.
608      * 
609      * @param obj input objects for the EJOE Server
610      * @param callback callback instance which will be notified if the request was processed or an an error occured
611      * @return an unique identifier for the started asynchrounous server invocation
612      */
613     public long executeAsync( final Object obj, final EJAsyncCallback callback )
614     {
615         long ident = _idGenerator.nextLong();
616         Thread t = new Thread( new EJAsyncWorker( this, obj, callback, ident ) );
617         t.setDaemon( true );
618         t.start();
619         return ident;
620     }
621 
622     /***
623      * Closes an existing persistent connection to the corresponding EJOE server. Invoking this method when using
624      * non-persistent connections has no effect.
625      */
626     public void close()
627     {
628         synchronized ( _mutex )
629         {
630             IOUtil.closeQuiet( this._selector );
631             IOUtil.closeQuiet( this._channel );
632             this._serverInfo = null;
633             this._channel = null;
634         }
635     }
636 
637     /***
638      * Opens a new socket connection to the EJServer
639      * 
640      * @return
641      * @throws IOException
642      */
643     private void openSocketChannel() throws IOException
644     {
645         this._channel = SocketChannel.open();
646         this._channel.configureBlocking( false );
647 
648         Socket socket = this._channel.socket();
649         try
650         {
651             socket.setSoTimeout( this._connectionTimeout );
652             socket.setReuseAddress( true );
653             socket.setSoLinger( false, 0 );
654             socket.setTrafficClass( EJConstants.IPTOS_THROUGHPUT );
655         }
656         catch ( Exception e )
657         {
658             // OK, can happen. Probably the current plattform respectively it's
659             // IP implementation doesn't allow setting these socket options
660         }
661 
662         this._selector = Selector.open();
663         // clean out cancelled keys
664         this._selector.selectNow();
665 
666         log.log( Level.INFO, "Opening new connection..." );
667 
668         this._channel.connect( new InetSocketAddress( _host, _port ) );
669     }
670 
671     /***
672      * Non-blocking request sending.
673      * 
674      * @param serverInfo ConnectionHeader received from the EJServer
675      * @param selKey
676      * @param obj the request
677      * @throws IOException
678      */
679     private void send( ConnectionHeader serverInfo, ConnectionHeader clientInfo, SelectionKey selKey, Object obj )
680             throws IOException
681     {
682         ByteBuffer dataBuf = (ByteBuffer) selKey.attachment();
683         DataChannel dataChannel = DataChannel.getInstance( serverInfo );
684         ByteBufferOutputStream out = null;
685 
686         try
687         {
688             if ( dataBuf == null )
689             {
690                 // usual way: serialize using a adapter
691                 if ( !clientInfo.isDirect() )
692                 {
693                     out = new ByteBufferOutputStream();
694                     serialize( out, obj, clientInfo );
695                     dataBuf = out.getBackingBuffer();
696                 }
697                 // direct mode: just use the attachement
698                 else
699                 {
700                     dataBuf = (ByteBuffer) obj;
701                 }
702 
703                 if ( dataBuf.position() > 0 )
704                 {
705                     dataBuf.flip();
706                 }
707 
708                 if ( log.isLoggable( Level.FINE ) )
709                 {
710                     byte[] tmp = new byte[dataBuf.remaining()];
711                     dataBuf.get( tmp );
712                     dataBuf.position( 0 );
713                     log.log( Level.FINE, "Going to send request..."
714                             + new String( tmp, EJConstants.EJOE_DEFAULT_CHARSET ) );
715                 }
716 
717                 dataChannel.writeHeader( serverInfo, dataBuf, this._connectionTimeout );
718             }
719 
720             dataChannel.nonBlockingWrite( _channel, dataBuf );
721             log.log( Level.FINE, "Request sent." );
722         }
723         finally
724         {
725             IOUtil.closeQuiet( out );
726         }
727     }
728 
729     /***
730      * Non-blocking response reading.
731      * 
732      * @param clientInfo client ConnectionHeader
733      * @return server response received by the corresponding EJServer
734      * @throws IOException
735      */
736     private Object receive( ConnectionHeader clientInfo ) throws IOException
737     {
738         Object result = null;
739         DataChannel dataChannel = DataChannel.getInstance( _serverInfo );
740         ByteBuffer dataBuf = clientInfo.getWaitingBuffer();
741         ByteBufferInputStream in = null;
742 
743         try
744         {
745             if ( dataBuf == null )
746             {
747                 int length = dataChannel.readHeader( _serverInfo, this._connectionTimeout );
748                 // server signals a null result?
749                 if ( length == 0 )
750                 {
751                     return null;
752                 }
753 
754                 // maybe the DataChannel signals that it has already read
755                 // partial data
756                 if ( _serverInfo.hasWaitingBuffer() )
757                 {
758                     dataBuf = _serverInfo.getWaitingBuffer();
759                 }
760                 else
761                 {
762                     dataBuf = ByteBufferAllocator.allocate( length );
763                 }
764 
765                 log.log( Level.FINE, "Going to read server response with length: " + length );
766             }
767 
768             if ( dataBuf.hasRemaining() )
769             {
770                 DataChannel.nonBlockingRead( _channel, dataBuf );
771             }
772 
773             dataBuf.flip();
774 
775             log
776                     .log( Level.FINE,
777                           "Response read. Now calling (de)serialize adapter to convert response back to object." );
778 
779             if ( dataBuf.hasRemaining() )
780             {
781                 try
782                 {
783                     // usual way: deserialize using a adapter
784                     if ( !clientInfo.isDirect() && !clientInfo.isMixed() )
785                     {
786                         in = new ByteBufferInputStream( dataBuf );
787                         result = deserialize( in, clientInfo );
788                     }
789                     // direct or mixed mode: don't deserialize, just copy and return
790                     // the read ByteBuffer
791                     else
792                     {
793                         // copy the bytebuffer
794                         ByteBuffer tmp = ByteBufferAllocator.allocate( dataBuf.remaining() );
795                         tmp.put( dataBuf );
796                         tmp.flip();
797                         result = tmp;
798                     }
799 
800                     log.log( Level.FINE, "Server response successfully converted to object." );
801                 }
802                 finally
803                 {
804                     clientInfo.releaseWaitingBuffer();
805                 }
806             }
807         }
808         finally
809         {
810             IOUtil.closeQuiet( in );
811         }
812 
813         return result;
814     }
815 
816     /***
817      * Blocking request sending
818      * 
819      * @param channel the connected SocketChannel
820      * @param obj the request
821      * @return server response received by the corresponding EJServer
822      * @throws IOException
823      */
824     private Object processBlocked( ConnectionHeader clientInfo, Object obj ) throws IOException
825     {
826         Object result = null;
827 
828         // usual way: (de)serialize using a adapter
829         if ( !clientInfo.isDirect() )
830         {
831             serialize( Channels.newOutputStream( _channel ), obj, clientInfo );
832             // default mode: deserialize the server response
833             if ( !clientInfo.isMixed() )
834             {
835                 result = deserialize( new ChannelInputStream( Channels.newInputStream( _channel ) ), clientInfo );
836             }
837             // mixed mode: don't deserialize the server response
838             else
839             {
840                 result = IOUtil.readDirect( new ChannelInputStream( Channels.newInputStream( _channel ) ) );
841             }
842         }
843         // direct mode: don't (de)serialize, just copy and return the read
844         // ByteBuffer
845         else
846         {
847             ByteBuffer tmp = (ByteBuffer) obj;
848             if ( tmp.position() > 0 )
849             {
850                 tmp.flip();
851             }
852             IOUtil.writeDirect( Channels.newOutputStream( _channel ), tmp );
853             result = IOUtil.readDirect( new ChannelInputStream( Channels.newInputStream( _channel ) ) );
854         }
855 
856         return result;
857     }
858 
859     /***
860      * Serializes a given request object through the given OutputStream
861      * 
862      * @param out the OutputStream to use
863      * @param obj the object to serialize
864      * @param serverInfo ConnectionHeader containing the connection data of the connected EJServer
865      * @throws IOException
866      */
867     private void serialize( OutputStream out, Object obj, ConnectionHeader clientInfo ) throws IOException
868     {
869         boolean compressed = !clientInfo.isHttp() && clientInfo.hasCompression() && _serverInfo.hasCompression();
870 
871         try
872         {
873             IOUtil.adapterSerialize( this._adapter, out, obj, compressed, clientInfo.getCompressionLevel() );
874         }
875         catch ( Exception e )
876         {
877             throw new RemoteException( "The client encountered an error while serializing your request data!", e );
878         }
879     }
880 
881     /***
882      * Deserializes a server response into an object from the given InputStream
883      * 
884      * @param in the InputStream to read the response from
885      * @param serverInfo ConnectionHeader containing the connection data of the connected EJServer
886      * @return a deserialized response object
887      * @throws IOException
888      */
889     private Object deserialize( InputStream in, ConnectionHeader clientInfo ) throws IOException
890     {
891         boolean compressed = clientInfo.hasCompression() && _serverInfo.hasCompression();
892 
893         try
894         {
895             return IOUtil.adapterDeserialize( this._adapter, in, compressed );
896         }
897         catch ( Exception e )
898         {
899             throw new RemoteException( "The client encountered an error while deserializing the server response!", e );
900         }
901     }
902 
903     /***
904      * Initializes a EJClassloader and sets it as the current context classloader. Also notifies the used
905      * SerializeAdapter to recognize the classloader change.
906      */
907     private void initClassLoader()
908     {
909         EJClassLoader ejcl = new EJClassLoader( Thread.currentThread().getContextClassLoader(), _host, _port );
910         Thread.currentThread().setContextClassLoader( ejcl );
911         this._adapter.handleClassLoaderChange( ejcl );
912     }
913 
914     /***
915      * Reads the settings for this client from a given Properties instance
916      * 
917      * @param props
918      */
919     private void loadProperties( Properties props )
920     {
921         String clazz = null;
922 
923         clazz = props.getProperty( "ejoe.adapter", EJConstants.EJOE_DEFAULT_ADAPTER.getName() );
924         _adapter = AdapterFactory.createAdapter( clazz );
925         this._clientInfo.setAdapterName( clazz );
926 
927         _host = props.getProperty( "ejoe.host", "127.0.0.1" );
928         _port = Integer.parseInt( props.getProperty( "ejoe.port", String.valueOf( EJConstants.EJOE_PORT ) ) );
929 
930         this._clientInfo.setHost( _host + ':' + _port );
931 
932         setConnectionTimeout( Integer.parseInt( props.getProperty( "ejoe.connectionTimeout", String
933                 .valueOf( EJConstants.EJOE_CONNECTION_TIMEOUT ) ) ) );
934 
935         enableCompression( Boolean.valueOf( props.getProperty( "ejoe.compression", "false" ) ).booleanValue() );
936 
937         enablePersistentConnection( Boolean.valueOf( props.getProperty( "ejoe.persistentConnection", "true" ) )
938                 .booleanValue() );
939 
940         enableHttpPackaging( Boolean.valueOf( props.getProperty( "ejoe.httpPackaging", "false" ) ).booleanValue() );
941 
942         if ( Boolean.valueOf( props.getProperty( "ejoe.remoteClassloader", "false" ) ).booleanValue() )
943         {
944             enableRemoteClassloading();
945         }
946 
947         if ( Boolean.valueOf( props.getProperty( "ejoe.inJVM", "false" ) ).booleanValue() )
948         {
949             setInJVM( true );
950         }
951 
952         int adapterStrategy = Integer.parseInt( props.getProperty( "ejoe.adapterStrategy", "0" ) );
953         setAdapterStrategy( adapterStrategy );
954     }
955 }