1 /**********************************************************************
2 * EJClient.java
3 * created on 08.08.2004 by netseeker
4 * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/EJClient.java,v $
5 * $Date: 2006/02/04 15:17:45 $
6 * $Revision: 1.58 $
7 *********************************************************************/
8
9 package de.netseeker.ejoe;
10
11 import java.io.FileInputStream;
12 import java.io.IOException;
13 import java.io.InputStream;
14 import java.io.OutputStream;
15 import java.io.Serializable;
16 import java.net.InetSocketAddress;
17 import java.nio.ByteBuffer;
18 import java.nio.channels.Channels;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.Selector;
21 import java.nio.channels.SocketChannel;
22 import java.rmi.RemoteException;
23 import java.util.Iterator;
24 import java.util.Properties;
25 import java.util.logging.Level;
26 import java.util.logging.Logger;
27
28 import de.netseeker.ejoe.adapter.ObjectStreamAdapter;
29 import de.netseeker.ejoe.adapter.SerializeAdapter;
30 import de.netseeker.ejoe.adapter.XStreamAdapter;
31 import de.netseeker.ejoe.io.ByteBufferInputStream;
32 import de.netseeker.ejoe.io.ByteBufferOutputStream;
33 import de.netseeker.ejoe.io.IOUtils;
34 import de.netseeker.ejoe.io.IncompleteIOException;
35
36 /***
37 * This is the client component of EJOE. You have to use this component to send and retrieve data to/from a EJOE server.
38 *
39 * @author netseeker aka Michael Manske
40 */
41 public class EJClient implements Serializable
42 {
43 private static final long serialVersionUID = 3258413932539359280L;
44
45 private static final Logger log = Logger.getLogger(EJClient.class.getName());
46
47 private SerializeAdapter _adapter;
48
49 private String _host = "127.0.0.1";
50
51 private int _port = EJConstants.EJOE_PORT;
52
53 private int _classLoaderPort = EJConstants.EJOE_CLASSLOADER_PORT;
54
55 private int _connectionTimeout = EJConstants.EJOE_CONNECTION_TIMEOUT;
56
57 private final ConnectionHeader _clientInfo = new ConnectionHeader();
58
59 private SocketChannel _channel;
60
61 private boolean _requestInProgress = false;
62
63 /***
64 * Creates an instance of the EJOE client pre-configured with settings from the global ejoe.properties file. You
65 * MUST provide such an property file to use this constructor.
66 */
67 public EJClient()
68 {
69 Properties props = new Properties();
70 try
71 {
72 props.load(EJClient.class.getResourceAsStream("/ejoe.properties"));
73 loadProperties(props);
74 }
75 catch (IOException e)
76 {
77 log
78 .log(
79 Level.SEVERE,
80 "ejoe.properties could not be read! Make sure you have placed a valid properties file in your classpath.",
81 e);
82 throw new RuntimeException(e);
83 }
84 }
85
86 /***
87 * Creates an instance of the EJOE client pre-configured with settings from a global properties file.
88 *
89 * @param pathToConfigFile
90 * path to the properties file
91 */
92 public EJClient(String pathToConfigFile)
93 {
94 Properties props = new Properties();
95 try
96 {
97 props.load(new FileInputStream(pathToConfigFile));
98 loadProperties(props);
99 }
100 catch (IOException e)
101 {
102 log
103 .log(
104 Level.SEVERE,
105 "ejoe.properties could not be read! Make sure you have placed a valid properties file in your classpath.",
106 e);
107 throw new RuntimeException(e);
108 }
109 }
110
111 /***
112 * Creates an instance of the EJOE client preconfigured to use an instance of
113 * de.netseeker.ejoe.adapter.ObjectStreamAdapter for (de)serializing.
114 *
115 * @param host
116 * address (dns name or ip address) of the EJOE server
117 * @param port
118 * port which the EJOE server listens to
119 */
120 public EJClient(String host, int port)
121 {
122 this._host = host;
123 this._port = port;
124 this._adapter = new XStreamAdapter();
125 this._clientInfo.setAdapterName(XStreamAdapter.class.getName());
126 }
127
128 /***
129 * Creates an instance of the EJOE client.
130 *
131 * @param host
132 * address (dns name or ip address) of the EJOE server
133 * @param port
134 * port which the EJOE server listens to
135 * @param adapter
136 * the adapter used for (de)serializing input paramter objects for the server and the return values
137 */
138 public EJClient(String host, int port, final SerializeAdapter adapter)
139 {
140 this._host = host;
141 this._port = port;
142 this._adapter = adapter;
143 this._clientInfo.setAdapterName(adapter.getClass().getName());
144 }
145
146 /***
147 * Creates an instance of the EJOE client with remote classloading enabled.
148 *
149 * @param host
150 * address (dns name or ip address) of the EJOE server
151 * @param port
152 * port which the EJOE server listens to
153 * @param classLoaderPort
154 * port which the EJOE class loader server listens to
155 * @param adapter
156 * the adapter used for (de)serializing input paramter objects for the server and the return values
157 */
158 public EJClient(String host, int port, int classLoaderPort, final SerializeAdapter adapter)
159 {
160 this(host, port, adapter);
161 initClassLoader();
162 }
163
164 /***
165 * Sets the connection timeout used when waiting for server responses. A value of zero (the default) blocks
166 * indefinitely.
167 *
168 * @param timeout
169 * the new timeout in milliseconds
170 */
171 public synchronized void setConnectionTimeout(int timeout)
172 {
173 this._connectionTimeout = timeout;
174 }
175
176 /***
177 * Tells this client to use compression (if supported by the server) or not.
178 *
179 * @param enable
180 */
181 public synchronized void enableCompression(boolean enable)
182 {
183 if (!this._requestInProgress)
184 {
185 this._clientInfo.setCompression(enable);
186 }
187 else
188 {
189 throw new IllegalStateException("Compression handling can't be changed while a request is in progress.");
190 }
191 }
192
193 /***
194 * Tells this client to use compression (if supported by the server) with the given compression level.
195 *
196 * @param compressionLevel
197 * the level of compression to use, must be in range of 0-9
198 */
199 public synchronized void enableCompression(int compressionLevel)
200 {
201 if (!this._requestInProgress)
202 {
203 this._clientInfo.setCompression(true);
204 this._clientInfo.setCompressionLevel(compressionLevel);
205 }
206 else
207 {
208 throw new IllegalStateException("Compression handling can't be changed while a request is in progress.");
209 }
210 }
211
212 /***
213 * Enables/disables usage of a persistent connection. If persistent connection is disabled a new connection will be
214 * used for each request.
215 *
216 * @param enable
217 */
218 public synchronized void enablePersistentConnection(boolean enable)
219 {
220 if (!this._requestInProgress)
221 {
222 this._clientInfo.setPersistent(enable);
223 }
224 else
225 {
226 throw new IllegalStateException("Connection handling can't be changed while a request is in progress.");
227 }
228 }
229
230 /***
231 * Enables remote classloading on the given remote port
232 *
233 * @param port
234 * port on which the EJOE classloader server listens
235 */
236 public synchronized void enableRemoteClassloading(int port)
237 {
238 if (Thread.currentThread().getContextClassLoader() instanceof EJClassLoader)
239 throw new IllegalStateException("Remote classloading already enabled!");
240
241 this._classLoaderPort = port;
242 initClassLoader();
243 }
244
245 /***
246 * Enables remote classloading on the default remote port.
247 */
248 public synchronized void enableRemoteClassloading()
249 {
250 if (Thread.currentThread().getContextClassLoader() instanceof EJClassLoader)
251 throw new IllegalStateException("Remote classloading already enabled!");
252
253 initClassLoader();
254 }
255
256 /***
257 * Main entry point for client tier implementations. Handles all remote server calls... This method is threadsafe,
258 * ensuring that only one request is processed at a time.
259 *
260 * @param obj
261 * input objects for the EJOE Server
262 * @return the object(s) returned by the EJOE server
263 */
264 public synchronized Object execute(Object obj) throws IOException
265 {
266 Object result = null;
267
268
269
270 this._requestInProgress = true;
271 ConnectionHeader serverInfo = null;
272 SocketChannel sChannel = null;
273 Selector selector = Selector.open();
274
275 try
276 {
277 sChannel = openSocketChannel();
278 sChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE);
279 Iterator it = null;
280 SelectionKey selKey = null;
281
282 while (selector.select(_connectionTimeout) > 0)
283 {
284 it = selector.selectedKeys().iterator();
285
286 while (it.hasNext())
287 {
288 selKey = (SelectionKey) it.next();
289 it.remove();
290
291 if (!selKey.isValid())
292 {
293 continue;
294 }
295
296 try
297 {
298 if (selKey.isConnectable() && sChannel.isConnectionPending())
299 {
300 if (!sChannel.finishConnect())
301 {
302 continue;
303 }
304
305 log.log(Level.INFO, "Connection established to "
306 + sChannel.socket().getRemoteSocketAddress());
307 }
308 if (selKey.isWritable())
309 {
310 if (serverInfo == null)
311 {
312 log.log(Level.FINEST, "Handshaking server...");
313 serverInfo = IOUtils.handshake(sChannel, _clientInfo, true, _connectionTimeout);
314 if (serverInfo == null)
315 {
316 throw new IOException("Connection timeout (" + _connectionTimeout
317 + "ms) reached while waiting for Handshake completing.");
318 }
319 }
320
321 if (serverInfo.hasNonBlockingReadWrite())
322 {
323 send(serverInfo, selKey, obj);
324 sChannel.register(selector, SelectionKey.OP_READ);
325 }
326 else
327 {
328 selKey.cancel();
329 sChannel.configureBlocking(true);
330 result = processBlocked(serverInfo, sChannel, obj);
331 return result;
332 }
333 }
334 if (selKey.isReadable())
335 {
336 result = receive(serverInfo, selKey);
337 return result;
338 }
339 }
340 catch (IncompleteIOException ioe)
341 {
342 selKey.cancel();
343 selector.selectNow();
344 sChannel.register(selector, ioe.getSelectionInterest(), ioe.getIOBuffer());
345 }
346 catch (IOException e)
347 {
348 selKey.cancel();
349 IOUtils.closeQuite(sChannel);
350
351
352 throw e;
353 }
354 }
355 }
356 }
357 finally
358 {
359 this._requestInProgress = false;
360 IOUtils.closeQuite(selector);
361
362 if (!this._clientInfo.isPersistent() || !serverInfo.isPersistent())
363 {
364 IOUtils.closeQuite(sChannel);
365 }
366 }
367
368 if (result instanceof Throwable)
369 {
370 throw new RemoteException("The server did return an Exception while handling your request!",
371 (Throwable) result);
372 }
373
374 return result;
375 }
376
377 /***
378 * Closes an existing persistent connection to the corresponding EJOE server. Invoking this method when
379 * non-persistent connections are used has no effect.
380 */
381 public void close()
382 {
383 if (this._channel != null)
384 {
385 IOUtils.closeQuite(this._channel);
386 this._channel = null;
387 }
388 }
389
390 /***
391 * Opens a new socket connection to the EJServer
392 *
393 * @return
394 * @throws IOException
395 */
396 private SocketChannel openSocketChannel() throws IOException
397 {
398 SocketChannel sChannel = this._channel;
399
400 if (sChannel == null || !sChannel.isOpen() || !this._clientInfo.isPersistent())
401 {
402 sChannel = SocketChannel.open();
403 sChannel.configureBlocking(false);
404 sChannel.connect(new InetSocketAddress(_host, _port));
405 }
406
407 return sChannel;
408 }
409
410 /***
411 * Non-blocking request sending.
412 *
413 * @param serverInfo ConnectionHeader received from the EJServer
414 * @param selKey
415 * @param obj the request
416 * @throws IOException
417 */
418 private void send(ConnectionHeader serverInfo, SelectionKey selKey, Object obj) throws IOException
419 {
420 SocketChannel channel = (SocketChannel) selKey.channel();
421 ByteBuffer dataBuf = (ByteBuffer) selKey.attachment();
422 ByteBufferOutputStream out = null;
423
424 try
425 {
426 if (dataBuf == null)
427 {
428 out = new ByteBufferOutputStream();
429 serialize(out, obj, serverInfo);
430 dataBuf = out.getBackingBuffer();
431 dataBuf.flip();
432 log.log(Level.FINE, "Going to send request...");
433 IOUtils.writeHeader(serverInfo, dataBuf);
434 }
435
436 IOUtils.nonBlockingWrite(channel, dataBuf);
437 log.log(Level.FINE, "Request sent.");
438 }
439 finally
440 {
441 IOUtils.closeQuite(out);
442 }
443 }
444
445 /***
446 * Non-blocking response reading.
447 *
448 * @param serverInfo ConnectionHeader received from the EJServer
449 * @param selKey
450 * @return server response received by the corresponding EJServer
451 * @throws IOException
452 */
453 private Object receive(ConnectionHeader serverInfo, SelectionKey selKey) throws IOException
454 {
455 Object result = null;
456 SocketChannel channel = (SocketChannel) selKey.channel();
457 ByteBuffer dataBuf = (ByteBuffer) selKey.attachment();
458 ByteBufferInputStream in = null;
459
460 try
461 {
462 if (dataBuf == null)
463 {
464 int length = IOUtils.readHeader(serverInfo);
465
466 if (length == 0)
467 {
468 return null;
469 }
470 dataBuf = ByteBuffer.allocateDirect(length);
471 dataBuf.limit(length);
472 log.log(Level.FINE, "Going to read server response with length: " + length);
473 }
474
475 IOUtils.nonBlockingRead(channel, dataBuf);
476 dataBuf.flip();
477 log.log(Level.FINE, "Response read. Now calling (de)serialize adapter to convert response back to object.");
478
479 in = new ByteBufferInputStream(dataBuf);
480 result = deserialize(in, serverInfo);
481
482 log.log(Level.FINE, "Server response successfully converted to object.");
483
484 selKey.cancel();
485 }
486 finally
487 {
488 IOUtils.closeQuite(in);
489 }
490
491 return result;
492 }
493
494 /***
495 * Combined blocking request sending and response reading.
496 *
497 * @param serverInfo
498 * ConnectionHeader received from the EJServer
499 * @param channel
500 * the connected SocketChannel
501 * @param obj
502 * the request
503 * @return server response received by the corresponding EJServer
504 * @throws IOException
505 */
506 private Object processBlocked(ConnectionHeader serverInfo, SocketChannel channel, Object obj) throws IOException
507 {
508 Object result = null;
509
510 if (channel != null)
511 {
512 serialize(Channels.newOutputStream(channel), obj, serverInfo);
513 result = deserialize(Channels.newInputStream(channel), serverInfo);
514 }
515
516 return result;
517 }
518
519 /***
520 * @param out
521 * @param obj
522 * @param serverInfo
523 * @throws IOException
524 */
525 private void serialize(OutputStream out, Object obj, ConnectionHeader serverInfo) throws IOException
526 {
527 boolean compressed = this._clientInfo.hasCompression() && serverInfo.hasCompression();
528 IOUtils.adapterSerialize(this._adapter, out, obj, compressed, this._clientInfo.getCompressionLevel());
529 }
530
531 /***
532 * @param in
533 * @param serverInfo
534 * @return
535 * @throws IOException
536 */
537 private Object deserialize(InputStream in, ConnectionHeader serverInfo) throws IOException
538 {
539 boolean compressed = this._clientInfo.hasCompression() && serverInfo.hasCompression();
540 return IOUtils.adapterDeserialize(this._adapter, in, compressed);
541 }
542
543 /***
544 * Reads the settings for this client from a given Properties instance
545 *
546 * @param props
547 */
548 private void loadProperties(Properties props)
549 {
550 String clazz = null;
551
552 try
553 {
554 clazz = props.getProperty("ejoe.adapter", ObjectStreamAdapter.class.getName());
555 _adapter = (SerializeAdapter) Class.forName(clazz).newInstance();
556 this._clientInfo.setAdapterName(clazz);
557 _host = props.getProperty("ejoe.host", "127.0.0.1");
558 _port = Integer.parseInt(props.getProperty("ejoe.port", String.valueOf(EJConstants.EJOE_PORT)));
559 _connectionTimeout = Integer.parseInt(props.getProperty("ejoe.connectionTimeout", String
560 .valueOf(EJConstants.EJOE_CONNECTION_TIMEOUT)));
561 this._clientInfo.setCompression(Boolean.valueOf(props.getProperty("ejoe.compression", "false"))
562 .booleanValue());
563 this._clientInfo.setPersistent(Boolean.valueOf(props.getProperty("ejoe.persistentConnection", "true"))
564 .booleanValue());
565
566 if (Boolean.valueOf(props.getProperty("ejoe.remoteClassloader", "false")).booleanValue())
567 {
568 _classLoaderPort = Integer.parseInt(props.getProperty("ejoe.classLoaderPort", String
569 .valueOf(EJConstants.EJOE_CLASSLOADER_PORT)));
570 initClassLoader();
571 }
572 }
573 catch (InstantiationException e)
574 {
575 log.log(Level.SEVERE, "Can't instantiate class " + clazz);
576 throw new RuntimeException(e);
577 }
578 catch (IllegalAccessException e)
579 {
580 log.log(Level.SEVERE, "Can't access configured class " + clazz);
581 throw new RuntimeException(e);
582 }
583 catch (ClassNotFoundException e)
584 {
585 log.log(Level.SEVERE, "Can't locate configured class " + clazz);
586 throw new RuntimeException(e);
587 }
588 }
589
590 /***
591 * Initializes a EJClassloader and sets it as the current context classloader. Also notifies the used
592 * SerializeAdapter to recognize the classloader change.
593 */
594 private void initClassLoader()
595 {
596 EJClassLoader ejcl = new EJClassLoader(Thread.currentThread().getContextClassLoader(), _host, _classLoaderPort);
597 Thread.currentThread().setContextClassLoader(ejcl);
598 this._adapter.handleClassLoaderChange(ejcl);
599 }
600 }