1 /**********************************************************************
2 * EJClient.java
3 * created on 08.08.2004 by netseeker
4 * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/EJClient.java,v $
5 * $Date: 2007/11/17 10:59:41 $
6 * $Revision: 1.100 $
7 *
8 * ====================================================================
9 *
10 * Copyright 2005-2006 netseeker aka Michael Manske
11 *
12 * Licensed under the Apache License, Version 2.0 (the "License");
13 * you may not use this file except in compliance with the License.
14 * You may obtain a copy of the License at
15 *
16 * http://www.apache.org/licenses/LICENSE-2.0
17 *
18 * Unless required by applicable law or agreed to in writing, software
19 * distributed under the License is distributed on an "AS IS" BASIS,
20 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21 * See the License for the specific language governing permissions and
22 * limitations under the License.
23 * ====================================================================
24 *
25 * This file is part of the ejoe framework.
26 * For more information on the author, please see
27 * <http://www.manskes.de/>.
28 *
29 *********************************************************************/
30
31 package de.netseeker.ejoe;
32
33 import java.io.FileInputStream;
34 import java.io.IOException;
35 import java.io.InputStream;
36 import java.io.OutputStream;
37 import java.io.Serializable;
38 import java.net.InetSocketAddress;
39 import java.net.Socket;
40 import java.nio.ByteBuffer;
41 import java.nio.channels.Channels;
42 import java.nio.channels.SelectionKey;
43 import java.nio.channels.Selector;
44 import java.nio.channels.SocketChannel;
45 import java.rmi.RemoteException;
46 import java.text.ParseException;
47 import java.util.Iterator;
48 import java.util.Properties;
49 import java.util.Random;
50 import java.util.logging.Level;
51 import java.util.logging.Logger;
52
53 import de.netseeker.ejoe.adapter.AdapterFactory;
54 import de.netseeker.ejoe.adapter.SerializeAdapter;
55 import de.netseeker.ejoe.cache.ByteBufferAllocator;
56 import de.netseeker.ejoe.core.InJvmProcessor;
57 import de.netseeker.ejoe.core.InJvmProcessorFactory;
58 import de.netseeker.ejoe.io.ByteBufferInputStream;
59 import de.netseeker.ejoe.io.ByteBufferOutputStream;
60 import de.netseeker.ejoe.io.ChannelInputStream;
61 import de.netseeker.ejoe.io.DataChannel;
62 import de.netseeker.ejoe.io.IOUtil;
63 import de.netseeker.ejoe.io.IncompleteIOException;
64
65 /***
66 * This is the client component of EJOE. You have to use this component to send and retrieve data to/from a EJOE server.
67 * <p>
68 * Basic usage:
69 *
70 * <pre>
71 * EJClient client = new EJClient("127.0.0.1", 12577); //you could also use EJConstants.EJOE_PORT
72 * Object response = client.execute("Hello World");
73 * ...
74 * </pre>
75 *
76 * </p>
77 * <p>
78 * Usage with persistent connections:
79 *
80 * <pre>
81 * EJClient client = new EJClient("127.0.0.1", 12577); //you could also use EJConstants.EJOE_PORT
82 * client.enablePersistentConnection(true); //request a persistent connection to the server
83 * Object response = client.execute("Hello World");
84 * ...
85 * reponse = client.execute("Bye World");
86 * //close the connection
87 * client.close();
88 * ...
89 * </pre>
90 *
91 * </p>
92 * <p>
93 * Usage with In-JVM EJServer:
94 *
95 * <pre>
96 * EJClient client = new EJClient("127.0.0.1", 12577); //you could also use EJConstants.EJOE_PORT
97 * client.setInJvm(true); //enable In-JVM mode, no socket connections will be used
98 * Object response = client.execute("Hello World");
99 * ...
100 * reponse = client.execute("Bye World");
101 * ...
102 * </pre>
103 *
104 * </p>
105 *
106 * @author netseeker aka Michael Manske
107 * @since 0.3.0
108 */
109 public class EJClient implements Serializable
110 {
111 private static final long serialVersionUID = 1L;
112
113 private transient static final Logger log = Logger.getLogger( EJClient.class.getName() );
114
115 private SerializeAdapter _adapter;
116
117 private String _host = "127.0.0.1";
118
119 private int _port = EJConstants.EJOE_PORT;
120
121 private int _connectionTimeout = EJConstants.EJOE_CONNECTION_TIMEOUT;
122
123 private boolean _inJVM = false;
124
125 private final ConnectionHeader _clientInfo = new ConnectionHeader( true );
126
127 private transient ConnectionHeader _serverInfo;
128
129 private transient SocketChannel _channel;
130
131 private transient Selector _selector;
132
133 private Object _mutex = new Object();
134
135 private static Random _idGenerator = new Random();
136
137 /***
138 * Creates an instance of the EJOE client pre-configured with settings from the global ejoe.properties file. You
139 * MUST provide such an property file on the classpath to use this constructor.
140 */
141 public EJClient()
142 {
143 Properties props = new Properties();
144 try
145 {
146 props.load( EJClient.class.getResourceAsStream( "/ejoe.properties" ) );
147 loadProperties( props );
148 }
149 catch ( IOException e )
150 {
151 log.log( Level.SEVERE, "ejoe.properties could not be read! "
152 + "Make sure you have placed a valid properties file in your classpath.", e );
153 throw new RuntimeException( e );
154 }
155 }
156
157 /***
158 * Creates an instance of the EJOE client pre-configured with settings from a global properties file.
159 *
160 * @param pathToConfigFile path to the properties file
161 */
162 public EJClient(String pathToConfigFile)
163 {
164 Properties props = new Properties();
165 FileInputStream fis = null;
166 try
167 {
168 fis = new FileInputStream( pathToConfigFile );
169 props.load( fis );
170 loadProperties( props );
171 }
172 catch ( IOException e )
173 {
174 log.log( Level.SEVERE, "ejoe.properties could not be read! "
175 + "Make sure you have placed a valid properties file in your classpath.", e );
176 throw new RuntimeException( e );
177 }
178 finally
179 {
180 IOUtil.closeQuiet( fis );
181 }
182 }
183
184 /***
185 * Creates an instance of the EJOE client pre-configured with settings from the given properties store.
186 *
187 * @param properties properties store containing EJClient settings
188 */
189 public EJClient(Properties properties)
190 {
191 loadProperties( properties );
192 }
193
194 /***
195 * Creates an instance of the EJOE client preconfigured to use an instance of
196 * de.netseeker.ejoe.adapter.ObjectStreamAdapter for (de)serializing.
197 *
198 * @param host address (dns name or ip address) of the EJOE server
199 * @param port port which the EJOE server listens to
200 */
201 public EJClient(String host, int port)
202 {
203 this._host = host;
204 this._port = port;
205 this._clientInfo.setHost( _host + ':' + _port );
206 this._adapter = AdapterFactory.createAdapter( EJConstants.EJOE_DEFAULT_ADAPTER.getName() );
207 this._clientInfo.setAdapterName( EJConstants.EJOE_DEFAULT_ADAPTER.getName() );
208 }
209
210 /***
211 * Creates an instance of the EJOE client.
212 *
213 * @param host address (dns name or ip address) of the EJOE server
214 * @param port port which the EJOE server listens to
215 * @param adapter the adapter used for (de)serializing input paramter objects for the server and the return values
216 */
217 public EJClient(String host, int port, final SerializeAdapter adapter)
218 {
219 this._host = host;
220 this._port = port;
221 this._clientInfo.setHost( _host + ':' + _port );
222 this._adapter = adapter;
223 this._clientInfo.setAdapterName( adapter.getClass().getName() );
224 }
225
226 /***
227 * Creates an instance of the EJOE client.
228 *
229 * @param host address (dns name or ip address) of the EJOE server
230 * @param port port which the EJOE server listens to
231 * @param adapter the adapter used for (de)serializing input paramter objects for the server and the return values
232 * @param isPersistent whether EJClient should use a persistent connection to the server or not
233 * @param isHttp whether EJClient should wrap requests in HTTP headers or not (lets EJClient socket data look like a
234 * HTTP 1.1 browser communication)
235 */
236 public EJClient(String host, int port, final SerializeAdapter adapter, boolean isPersistent, boolean isHttp,
237 boolean useCompression)
238 {
239 this( host, port, adapter );
240 enablePersistentConnection( isPersistent );
241 enableCompression( useCompression );
242 enableHttpPackaging( isHttp );
243 }
244
245 /***
246 * Sets the connection timeout used when waiting for server responses. A value of zero (the default) blocks
247 * indefinitely.
248 *
249 * @param timeout the new timeout in milliseconds
250 */
251 public synchronized void setConnectionTimeout( int timeout )
252 {
253 this._connectionTimeout = timeout;
254 }
255
256 /***
257 * Tells this client to use compression (if supported by the server) or not.
258 *
259 * @param enable
260 */
261 public synchronized void enableCompression( boolean enable )
262 {
263 this._clientInfo.setCompression( enable );
264 }
265
266 /***
267 * Tells this client to use compression (if supported by the server) with the given compression level.
268 *
269 * @param compressionLevel the level of compression to use, must be in range of 0-9
270 */
271 public synchronized void enableCompression( int compressionLevel )
272 {
273 this._clientInfo.setCompression( true );
274 this._clientInfo.setCompressionLevel( compressionLevel );
275 }
276
277 /***
278 * Enables/disables usage of a persistent connection. If persistent connection is disabled a new connection will be
279 * used for each request.
280 *
281 * @param enable
282 */
283 public synchronized void enablePersistentConnection( boolean enable )
284 {
285 this._clientInfo.setPersistent( enable );
286 }
287
288 /***
289 * Enables/disables usage of a persistent connection. If persistent connection is disabled a new connection will be
290 * used for each request.
291 *
292 * @param enable
293 */
294 public synchronized void enableHttpPackaging( boolean enable )
295 {
296 this._clientInfo.setHttp( enable );
297 }
298
299 /***
300 * @return the _inJVM
301 */
302 public boolean isInJVM()
303 {
304 return _inJVM;
305 }
306
307 /***
308 * @param injvm the _inJVM to set
309 */
310 public synchronized void setInJVM( boolean injvm )
311 {
312 enableHttpPackaging( false );
313 _inJVM = injvm;
314 }
315
316 /***
317 * Controls the used Adapter Strategy:
318 * <ul>
319 * <li>ADAPTER_STRATEGY_DEFAULT: both, client and server, will serialize and deserialize objects. The client will
320 * return a deserialized object</li>
321 * <li>ADAPTER_STRATEGY_DIRECT: both, client and server, will NOT serialize and deserialize objects. The client
322 * will expect ByteBuffers as arguments and the ServerHandler will get ByteBuffers too. The client will return
323 * ByteBuffers to the caller.</li>
324 * <li>ADAPTER_STRATEGY_MIXED: both, client and server, will exchange serialized objects. The ServerHandler will
325 * get deserialized objects too handle. The client will return a ByteBuffer instead of derserialized objects to the
326 * caller.</li>
327 * </ul>
328 *
329 * @param adapterStrategy
330 */
331 public synchronized void setAdapterStrategy( int adapterStrategy )
332 {
333 switch ( adapterStrategy )
334 {
335 case EJConstants.ADAPTER_STRATEGY_DEFAULT:
336 this._clientInfo.setIsDirect( false );
337 this._clientInfo.setIsMixed( false );
338 break;
339 case EJConstants.ADAPTER_STRATEGY_DIRECT:
340 this._clientInfo.setIsDirect( true );
341 this._clientInfo.setIsMixed( false );
342 break;
343 case EJConstants.ADAPTER_STRATEGY_MIXED:
344 this._clientInfo.setIsDirect( false );
345 this._clientInfo.setIsMixed( true );
346 break;
347 default:
348 throw new IllegalArgumentException( "Unknown Adapter Strategy: " + adapterStrategy );
349 }
350 }
351
352 /***
353 * Enables remote classloading on the default remote port.
354 */
355 public synchronized void enableRemoteClassloading()
356 {
357 if ( Thread.currentThread().getContextClassLoader() instanceof EJClassLoader )
358 throw new IllegalStateException( "Remote classloading already enabled!" );
359
360 initClassLoader();
361 }
362
363 /***
364 * Getter method to get direct access to the underlying ConnectionHeader (as required by the WSIF port
365 * implementation)
366 *
367 * @return the used client configuration represented as ConnectionHeader
368 */
369 public ConnectionHeader getConnectionHeader()
370 {
371 return _clientInfo;
372 }
373
374 /***
375 * Main entry point for client tier implementations. Handles all remote server calls... This method is threadsafe,
376 * ensuring that only one request is processed at a time.
377 *
378 * @param obj input objects for the EJOE Server
379 * @return the object(s) returned by the EJOE server
380 */
381 public Object execute( final Object obj ) throws IOException
382 {
383 synchronized ( _mutex )
384 {
385 Object result = null;
386
387
388
389 ConnectionHeader clientInfo = this._clientInfo.copy();
390
391 if ( _inJVM )
392 {
393 InJvmProcessor processor = InJvmProcessorFactory.createProcessor( clientInfo );
394
395 try
396 {
397 result = processor.process( obj );
398 }
399 catch ( Exception e )
400 {
401 result = e;
402 }
403 }
404 else
405 {
406 SelectionKey selKey = null;
407
408 boolean error = false;
409
410 try
411 {
412 if ( this._channel == null || !this._channel.isOpen() )
413 {
414 log.log( Level.FINEST, "opening new connection" );
415 openSocketChannel();
416 _channel.register( this._selector, SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE );
417 }
418 else
419 {
420 log.log( Level.FINEST, "reusing persistent connection" );
421 selKey = _channel.keyFor( this._selector );
422
423 selKey = _channel.register( this._selector, 0 );
424 selKey.interestOps( SelectionKey.OP_WRITE );
425 }
426
427 Iterator it = null;
428
429 while ( this._selector.select( _connectionTimeout ) > 0 )
430 {
431 it = this._selector.selectedKeys().iterator();
432
433 while ( it.hasNext() )
434 {
435 selKey = (SelectionKey) it.next();
436 it.remove();
437
438 if ( !selKey.isValid() )
439 {
440 continue;
441 }
442
443 try
444 {
445 if ( selKey.isConnectable() && _channel.isConnectionPending() )
446 {
447 if ( !_channel.finishConnect() )
448 {
449 continue;
450 }
451
452 clientInfo.setChannel( _channel );
453
454 if ( log.isLoggable( Level.INFO ) )
455 {
456 log.log( Level.INFO, "Connection established to "
457 + _channel.socket().getRemoteSocketAddress() );
458 }
459
460 _channel.register( this._selector, SelectionKey.OP_WRITE );
461 }
462 if ( selKey.isWritable() )
463 {
464 if ( _serverInfo == null )
465 {
466 log.log( Level.FINEST, "Handshaking server..." );
467 _serverInfo = DataChannel.getInstance().handshake( clientInfo, _channel,
468 _connectionTimeout );
469 if ( _serverInfo == null )
470 {
471 throw new IOException( "Connection timeout (" + _connectionTimeout
472 + "ms) reached while waiting for handshake completing." );
473 }
474 else if ( clientInfo.isHttp() && !_serverInfo.isHttp() )
475 {
476 throw new UnsupportedOperationException(
477 "The server does not permit usage of HTTP packaging!" );
478 }
479 }
480
481 if ( _serverInfo.hasNonBlockingReadWrite() )
482 {
483 send( _serverInfo, clientInfo, selKey, obj );
484 _channel.register( this._selector, SelectionKey.OP_READ );
485 }
486 else
487 {
488 selKey.cancel();
489 _channel.configureBlocking( true );
490 result = processBlocked( clientInfo, obj );
491
492 if ( result != null && (result instanceof Throwable) )
493 {
494 if ( !(result instanceof RemoteException) )
495 {
496 throw new RemoteException(
497 "The server did return an Exception while handling your request!",
498 (Throwable) result );
499 }
500 else
501 {
502 throw (RemoteException) result;
503 }
504 }
505
506 return result;
507 }
508 }
509 else if ( selKey.isReadable() )
510 {
511 result = receive( clientInfo );
512 selKey.cancel();
513
514 if ( result != null && (result instanceof Throwable) )
515 {
516 if ( !(result instanceof RemoteException) )
517 {
518 throw new RemoteException(
519 "The server did return an Exception while handling your request!",
520 (Throwable) result );
521 }
522 else
523 {
524 throw (RemoteException) result;
525 }
526 }
527
528 return result;
529 }
530 }
531 catch ( IncompleteIOException ioe )
532 {
533
534 if ( ioe.getIOBuffer() != null )
535 {
536 clientInfo.setWaitingBuffer( ioe.getIOBuffer() );
537 _channel.register( this._selector, ioe.getSelectionInterest() );
538 }
539 else
540 {
541
542
543 throw ioe;
544 }
545 }
546 catch ( ParseException e )
547 {
548 error = true;
549 throw new IOException(
550 "Connection closed due to unparseable connection header! Exact message was: "
551 + e.getMessage() );
552 }
553 catch ( IOException e )
554 {
555 if ( !(e instanceof RemoteException) )
556 {
557 error = true;
558 }
559
560
561 throw e;
562 }
563 }
564 }
565 }
566 finally
567 {
568 selKey.cancel();
569
570 clientInfo.releaseAttachment();
571 clientInfo.releaseWaitingBuffer();
572 if ( _serverInfo != null )
573 {
574 _serverInfo.releaseAttachment();
575 _serverInfo.releaseWaitingBuffer();
576 }
577
578 if ( error )
579 {
580 close();
581 }
582 else if ( !clientInfo.isPersistent() || _serverInfo == null || !_serverInfo.isPersistent() )
583 {
584 close();
585 }
586 else
587 {
588
589 this._selector.selectNow();
590 if ( _channel.isBlocking() ) _channel.configureBlocking( false );
591 }
592 }
593 }
594
595 if ( result != null && result instanceof Throwable )
596 {
597 throw new RemoteException( "The server did return an Exception while handling your request!",
598 (Throwable) result );
599 }
600
601 return result;
602 }
603 }
604
605 /***
606 * Asynchrounous entry point for executing server invocations. This method works asynchrounous in that way, as it
607 * starts each invocation in a new thread.
608 *
609 * @param obj input objects for the EJOE Server
610 * @param callback callback instance which will be notified if the request was processed or an an error occured
611 * @return an unique identifier for the started asynchrounous server invocation
612 */
613 public long executeAsync( final Object obj, final EJAsyncCallback callback )
614 {
615 long ident = _idGenerator.nextLong();
616 Thread t = new Thread( new EJAsyncWorker( this, obj, callback, ident ) );
617 t.setDaemon( true );
618 t.start();
619 return ident;
620 }
621
622 /***
623 * Closes an existing persistent connection to the corresponding EJOE server. Invoking this method when using
624 * non-persistent connections has no effect.
625 */
626 public void close()
627 {
628 synchronized ( _mutex )
629 {
630 IOUtil.closeQuiet( this._selector );
631 IOUtil.closeQuiet( this._channel );
632 this._serverInfo = null;
633 this._channel = null;
634 }
635 }
636
637 /***
638 * Opens a new socket connection to the EJServer
639 *
640 * @return
641 * @throws IOException
642 */
643 private void openSocketChannel() throws IOException
644 {
645 this._channel = SocketChannel.open();
646 this._channel.configureBlocking( false );
647
648 Socket socket = this._channel.socket();
649 try
650 {
651 socket.setSoTimeout( this._connectionTimeout );
652 socket.setReuseAddress( true );
653 socket.setSoLinger( false, 0 );
654 socket.setTrafficClass( EJConstants.IPTOS_THROUGHPUT );
655 }
656 catch ( Exception e )
657 {
658
659
660 }
661
662 this._selector = Selector.open();
663
664 this._selector.selectNow();
665
666 log.log( Level.INFO, "Opening new connection..." );
667
668 this._channel.connect( new InetSocketAddress( _host, _port ) );
669 }
670
671 /***
672 * Non-blocking request sending.
673 *
674 * @param serverInfo ConnectionHeader received from the EJServer
675 * @param selKey
676 * @param obj the request
677 * @throws IOException
678 */
679 private void send( ConnectionHeader serverInfo, ConnectionHeader clientInfo, SelectionKey selKey, Object obj )
680 throws IOException
681 {
682 ByteBuffer dataBuf = (ByteBuffer) selKey.attachment();
683 DataChannel dataChannel = DataChannel.getInstance( serverInfo );
684 ByteBufferOutputStream out = null;
685
686 try
687 {
688 if ( dataBuf == null )
689 {
690
691 if ( !clientInfo.isDirect() )
692 {
693 out = new ByteBufferOutputStream();
694 serialize( out, obj, clientInfo );
695 dataBuf = out.getBackingBuffer();
696 }
697
698 else
699 {
700 dataBuf = (ByteBuffer) obj;
701 }
702
703 if ( dataBuf.position() > 0 )
704 {
705 dataBuf.flip();
706 }
707
708 if ( log.isLoggable( Level.FINE ) )
709 {
710 byte[] tmp = new byte[dataBuf.remaining()];
711 dataBuf.get( tmp );
712 dataBuf.position( 0 );
713 log.log( Level.FINE, "Going to send request..."
714 + new String( tmp, EJConstants.EJOE_DEFAULT_CHARSET ) );
715 }
716
717 dataChannel.writeHeader( serverInfo, dataBuf, this._connectionTimeout );
718 }
719
720 dataChannel.nonBlockingWrite( _channel, dataBuf );
721 log.log( Level.FINE, "Request sent." );
722 }
723 finally
724 {
725 IOUtil.closeQuiet( out );
726 }
727 }
728
729 /***
730 * Non-blocking response reading.
731 *
732 * @param clientInfo client ConnectionHeader
733 * @return server response received by the corresponding EJServer
734 * @throws IOException
735 */
736 private Object receive( ConnectionHeader clientInfo ) throws IOException
737 {
738 Object result = null;
739 DataChannel dataChannel = DataChannel.getInstance( _serverInfo );
740 ByteBuffer dataBuf = clientInfo.getWaitingBuffer();
741 ByteBufferInputStream in = null;
742
743 try
744 {
745 if ( dataBuf == null )
746 {
747 int length = dataChannel.readHeader( _serverInfo, this._connectionTimeout );
748
749 if ( length == 0 )
750 {
751 return null;
752 }
753
754
755
756 if ( _serverInfo.hasWaitingBuffer() )
757 {
758 dataBuf = _serverInfo.getWaitingBuffer();
759 }
760 else
761 {
762 dataBuf = ByteBufferAllocator.allocate( length );
763 }
764
765 log.log( Level.FINE, "Going to read server response with length: " + length );
766 }
767
768 if ( dataBuf.hasRemaining() )
769 {
770 DataChannel.nonBlockingRead( _channel, dataBuf );
771 }
772
773 dataBuf.flip();
774
775 log
776 .log( Level.FINE,
777 "Response read. Now calling (de)serialize adapter to convert response back to object." );
778
779 if ( dataBuf.hasRemaining() )
780 {
781 try
782 {
783
784 if ( !clientInfo.isDirect() && !clientInfo.isMixed() )
785 {
786 in = new ByteBufferInputStream( dataBuf );
787 result = deserialize( in, clientInfo );
788 }
789
790
791 else
792 {
793
794 ByteBuffer tmp = ByteBufferAllocator.allocate( dataBuf.remaining() );
795 tmp.put( dataBuf );
796 tmp.flip();
797 result = tmp;
798 }
799
800 log.log( Level.FINE, "Server response successfully converted to object." );
801 }
802 finally
803 {
804 clientInfo.releaseWaitingBuffer();
805 }
806 }
807 }
808 finally
809 {
810 IOUtil.closeQuiet( in );
811 }
812
813 return result;
814 }
815
816 /***
817 * Blocking request sending
818 *
819 * @param channel the connected SocketChannel
820 * @param obj the request
821 * @return server response received by the corresponding EJServer
822 * @throws IOException
823 */
824 private Object processBlocked( ConnectionHeader clientInfo, Object obj ) throws IOException
825 {
826 Object result = null;
827
828
829 if ( !clientInfo.isDirect() )
830 {
831 serialize( Channels.newOutputStream( _channel ), obj, clientInfo );
832
833 if ( !clientInfo.isMixed() )
834 {
835 result = deserialize( new ChannelInputStream( Channels.newInputStream( _channel ) ), clientInfo );
836 }
837
838 else
839 {
840 result = IOUtil.readDirect( new ChannelInputStream( Channels.newInputStream( _channel ) ) );
841 }
842 }
843
844
845 else
846 {
847 ByteBuffer tmp = (ByteBuffer) obj;
848 if ( tmp.position() > 0 )
849 {
850 tmp.flip();
851 }
852 IOUtil.writeDirect( Channels.newOutputStream( _channel ), tmp );
853 result = IOUtil.readDirect( new ChannelInputStream( Channels.newInputStream( _channel ) ) );
854 }
855
856 return result;
857 }
858
859 /***
860 * Serializes a given request object through the given OutputStream
861 *
862 * @param out the OutputStream to use
863 * @param obj the object to serialize
864 * @param serverInfo ConnectionHeader containing the connection data of the connected EJServer
865 * @throws IOException
866 */
867 private void serialize( OutputStream out, Object obj, ConnectionHeader clientInfo ) throws IOException
868 {
869 boolean compressed = !clientInfo.isHttp() && clientInfo.hasCompression() && _serverInfo.hasCompression();
870
871 try
872 {
873 IOUtil.adapterSerialize( this._adapter, out, obj, compressed, clientInfo.getCompressionLevel() );
874 }
875 catch ( Exception e )
876 {
877 throw new RemoteException( "The client encountered an error while serializing your request data!", e );
878 }
879 }
880
881 /***
882 * Deserializes a server response into an object from the given InputStream
883 *
884 * @param in the InputStream to read the response from
885 * @param serverInfo ConnectionHeader containing the connection data of the connected EJServer
886 * @return a deserialized response object
887 * @throws IOException
888 */
889 private Object deserialize( InputStream in, ConnectionHeader clientInfo ) throws IOException
890 {
891 boolean compressed = clientInfo.hasCompression() && _serverInfo.hasCompression();
892
893 try
894 {
895 return IOUtil.adapterDeserialize( this._adapter, in, compressed );
896 }
897 catch ( Exception e )
898 {
899 throw new RemoteException( "The client encountered an error while deserializing the server response!", e );
900 }
901 }
902
903 /***
904 * Initializes a EJClassloader and sets it as the current context classloader. Also notifies the used
905 * SerializeAdapter to recognize the classloader change.
906 */
907 private void initClassLoader()
908 {
909 EJClassLoader ejcl = new EJClassLoader( Thread.currentThread().getContextClassLoader(), _host, _port );
910 Thread.currentThread().setContextClassLoader( ejcl );
911 this._adapter.handleClassLoaderChange( ejcl );
912 }
913
914 /***
915 * Reads the settings for this client from a given Properties instance
916 *
917 * @param props
918 */
919 private void loadProperties( Properties props )
920 {
921 String clazz = null;
922
923 clazz = props.getProperty( "ejoe.adapter", EJConstants.EJOE_DEFAULT_ADAPTER.getName() );
924 _adapter = AdapterFactory.createAdapter( clazz );
925 this._clientInfo.setAdapterName( clazz );
926
927 _host = props.getProperty( "ejoe.host", "127.0.0.1" );
928 _port = Integer.parseInt( props.getProperty( "ejoe.port", String.valueOf( EJConstants.EJOE_PORT ) ) );
929
930 this._clientInfo.setHost( _host + ':' + _port );
931
932 setConnectionTimeout( Integer.parseInt( props.getProperty( "ejoe.connectionTimeout", String
933 .valueOf( EJConstants.EJOE_CONNECTION_TIMEOUT ) ) ) );
934
935 enableCompression( Boolean.valueOf( props.getProperty( "ejoe.compression", "false" ) ).booleanValue() );
936
937 enablePersistentConnection( Boolean.valueOf( props.getProperty( "ejoe.persistentConnection", "true" ) )
938 .booleanValue() );
939
940 enableHttpPackaging( Boolean.valueOf( props.getProperty( "ejoe.httpPackaging", "false" ) ).booleanValue() );
941
942 if ( Boolean.valueOf( props.getProperty( "ejoe.remoteClassloader", "false" ) ).booleanValue() )
943 {
944 enableRemoteClassloading();
945 }
946
947 if ( Boolean.valueOf( props.getProperty( "ejoe.inJVM", "false" ) ).booleanValue() )
948 {
949 setInJVM( true );
950 }
951
952 int adapterStrategy = Integer.parseInt( props.getProperty( "ejoe.adapterStrategy", "0" ) );
953 setAdapterStrategy( adapterStrategy );
954 }
955 }