View Javadoc

1   /**********************************************************************
2    * EJClient.java
3    * created on 08.08.2004 by netseeker
4    * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/EJClient.java,v $
5    * $Date: 2006/02/04 15:17:45 $
6    * $Revision: 1.58 $
7    *********************************************************************/
8   
9   package de.netseeker.ejoe;
10  
11  import java.io.FileInputStream;
12  import java.io.IOException;
13  import java.io.InputStream;
14  import java.io.OutputStream;
15  import java.io.Serializable;
16  import java.net.InetSocketAddress;
17  import java.nio.ByteBuffer;
18  import java.nio.channels.Channels;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.Selector;
21  import java.nio.channels.SocketChannel;
22  import java.rmi.RemoteException;
23  import java.util.Iterator;
24  import java.util.Properties;
25  import java.util.logging.Level;
26  import java.util.logging.Logger;
27  
28  import de.netseeker.ejoe.adapter.ObjectStreamAdapter;
29  import de.netseeker.ejoe.adapter.SerializeAdapter;
30  import de.netseeker.ejoe.adapter.XStreamAdapter;
31  import de.netseeker.ejoe.io.ByteBufferInputStream;
32  import de.netseeker.ejoe.io.ByteBufferOutputStream;
33  import de.netseeker.ejoe.io.IOUtils;
34  import de.netseeker.ejoe.io.IncompleteIOException;
35  
36  /***
37   * This is the client component of EJOE. You have to use this component to send and retrieve data to/from a EJOE server.
38   *
39   * @author netseeker aka Michael Manske
40   */
41  public class EJClient implements Serializable
42  {
43  	private static final long		serialVersionUID	= 3258413932539359280L;
44  
45  	private static final Logger		log					= Logger.getLogger(EJClient.class.getName());
46  
47  	private SerializeAdapter		_adapter;
48  
49  	private String					_host				= "127.0.0.1";
50  
51  	private int						_port				= EJConstants.EJOE_PORT;
52  
53  	private int						_classLoaderPort	= EJConstants.EJOE_CLASSLOADER_PORT;
54  
55  	private int						_connectionTimeout	= EJConstants.EJOE_CONNECTION_TIMEOUT;
56  
57  	private final ConnectionHeader	_clientInfo			= new ConnectionHeader();
58  
59  	private SocketChannel			_channel;
60  
61  	private boolean					_requestInProgress	= false;
62  
63  	/***
64  	 * Creates an instance of the EJOE client pre-configured with settings from the global ejoe.properties file. You
65  	 * MUST provide such an property file to use this constructor.
66  	 */
67  	public EJClient()
68  	{
69  		Properties props = new Properties();
70  		try
71  		{
72  			props.load(EJClient.class.getResourceAsStream("/ejoe.properties"));
73  			loadProperties(props);
74  		}
75  		catch (IOException e)
76  		{
77  			log
78  					.log(
79  							Level.SEVERE,
80  							"ejoe.properties could not be read! Make sure you have placed a valid properties file in your classpath.",
81  							e);
82  			throw new RuntimeException(e);
83  		}
84  	}
85  
86  	/***
87  	 * Creates an instance of the EJOE client pre-configured with settings from a global properties file.
88  	 *
89  	 * @param pathToConfigFile
90  	 *            path to the properties file
91  	 */
92  	public EJClient(String pathToConfigFile)
93  	{
94  		Properties props = new Properties();
95  		try
96  		{
97  			props.load(new FileInputStream(pathToConfigFile));
98  			loadProperties(props);
99  		}
100 		catch (IOException e)
101 		{
102 			log
103 					.log(
104 							Level.SEVERE,
105 							"ejoe.properties could not be read! Make sure you have placed a valid properties file in your classpath.",
106 							e);
107 			throw new RuntimeException(e);
108 		}
109 	}
110 
111 	/***
112 	 * Creates an instance of the EJOE client preconfigured to use an instance of
113 	 * de.netseeker.ejoe.adapter.ObjectStreamAdapter for (de)serializing.
114 	 *
115 	 * @param host
116 	 *            address (dns name or ip address) of the EJOE server
117 	 * @param port
118 	 *            port which the EJOE server listens to
119 	 */
120 	public EJClient(String host, int port)
121 	{
122 		this._host = host;
123 		this._port = port;
124 		this._adapter = new XStreamAdapter();
125 		this._clientInfo.setAdapterName(XStreamAdapter.class.getName());
126 	}
127 
128 	/***
129 	 * Creates an instance of the EJOE client.
130 	 *
131 	 * @param host
132 	 *            address (dns name or ip address) of the EJOE server
133 	 * @param port
134 	 *            port which the EJOE server listens to
135 	 * @param adapter
136 	 *            the adapter used for (de)serializing input paramter objects for the server and the return values
137 	 */
138 	public EJClient(String host, int port, final SerializeAdapter adapter)
139 	{
140 		this._host = host;
141 		this._port = port;
142 		this._adapter = adapter;
143 		this._clientInfo.setAdapterName(adapter.getClass().getName());
144 	}
145 
146 	/***
147 	 * Creates an instance of the EJOE client with remote classloading enabled.
148 	 *
149 	 * @param host
150 	 *            address (dns name or ip address) of the EJOE server
151 	 * @param port
152 	 *            port which the EJOE server listens to
153 	 * @param classLoaderPort
154 	 *            port which the EJOE class loader server listens to
155 	 * @param adapter
156 	 *            the adapter used for (de)serializing input paramter objects for the server and the return values
157 	 */
158 	public EJClient(String host, int port, int classLoaderPort, final SerializeAdapter adapter)
159 	{
160 		this(host, port, adapter);
161 		initClassLoader();
162 	}
163 
164 	/***
165 	 * Sets the connection timeout used when waiting for server responses. A value of zero (the default) blocks
166 	 * indefinitely.
167 	 *
168 	 * @param timeout
169 	 *            the new timeout in milliseconds
170 	 */
171 	public synchronized void setConnectionTimeout(int timeout)
172 	{
173 		this._connectionTimeout = timeout;
174 	}
175 
176 	/***
177 	 * Tells this client to use compression (if supported by the server) or not.
178 	 *
179 	 * @param enable
180 	 */
181 	public synchronized void enableCompression(boolean enable)
182 	{
183 		if (!this._requestInProgress)
184 		{
185 			this._clientInfo.setCompression(enable);
186 		}
187 		else
188 		{
189 			throw new IllegalStateException("Compression handling can't be changed while a request is in progress.");
190 		}
191 	}
192 
193 	/***
194 	 * Tells this client to use compression (if supported by the server) with the given compression level.
195 	 *
196 	 * @param compressionLevel
197 	 *            the level of compression to use, must be in range of 0-9
198 	 */
199 	public synchronized void enableCompression(int compressionLevel)
200 	{
201 		if (!this._requestInProgress)
202 		{
203 			this._clientInfo.setCompression(true);
204 			this._clientInfo.setCompressionLevel(compressionLevel);
205 		}
206 		else
207 		{
208 			throw new IllegalStateException("Compression handling can't be changed while a request is in progress.");
209 		}
210 	}
211 
212 	/***
213 	 * Enables/disables usage of a persistent connection. If persistent connection is disabled a new connection will be
214 	 * used for each request.
215 	 *
216 	 * @param enable
217 	 */
218 	public synchronized void enablePersistentConnection(boolean enable)
219 	{
220 		if (!this._requestInProgress)
221 		{
222 			this._clientInfo.setPersistent(enable);
223 		}
224 		else
225 		{
226 			throw new IllegalStateException("Connection handling can't be changed while a request is in progress.");
227 		}
228 	}
229 
230 	/***
231 	 * Enables remote classloading on the given remote port
232 	 *
233 	 * @param port
234 	 *            port on which the EJOE classloader server listens
235 	 */
236 	public synchronized void enableRemoteClassloading(int port)
237 	{
238 		if (Thread.currentThread().getContextClassLoader() instanceof EJClassLoader)
239 			throw new IllegalStateException("Remote classloading already enabled!");
240 
241 		this._classLoaderPort = port;
242 		initClassLoader();
243 	}
244 
245 	/***
246 	 * Enables remote classloading on the default remote port.
247 	 */
248 	public synchronized void enableRemoteClassloading()
249 	{
250 		if (Thread.currentThread().getContextClassLoader() instanceof EJClassLoader)
251 			throw new IllegalStateException("Remote classloading already enabled!");
252 
253 		initClassLoader();
254 	}
255 
256 	/***
257 	 * Main entry point for client tier implementations. Handles all remote server calls... This method is threadsafe,
258 	 * ensuring that only one request is processed at a time.
259 	 *
260 	 * @param obj
261 	 *            input objects for the EJOE Server
262 	 * @return the object(s) returned by the EJOE server
263 	 */
264 	public synchronized Object execute(Object obj) throws IOException
265 	{
266 		Object result = null;
267 
268 		// ensure that only one request is in process at one time without
269 		// blocking the whole client
270 		this._requestInProgress = true;
271 		ConnectionHeader serverInfo = null;
272 		SocketChannel sChannel = null;
273 		Selector selector = Selector.open();
274 
275 		try
276 		{
277 			sChannel = openSocketChannel();
278 			sChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE);
279 			Iterator it = null;
280 			SelectionKey selKey = null;
281 
282 			while (selector.select(_connectionTimeout) > 0)
283 			{
284 				it = selector.selectedKeys().iterator();
285 
286 				while (it.hasNext())
287 				{
288 					selKey = (SelectionKey) it.next();
289 					it.remove();
290 
291 					if (!selKey.isValid())
292 					{
293 						continue;
294 					}
295 
296 					try
297 					{
298 						if (selKey.isConnectable() && sChannel.isConnectionPending())
299 						{
300 							if (!sChannel.finishConnect())
301 							{
302 								continue;
303 							}
304 
305 							log.log(Level.INFO, "Connection established to "
306 									+ sChannel.socket().getRemoteSocketAddress());
307 						}
308 						if (selKey.isWritable())
309 						{
310 							if (serverInfo == null)
311 							{
312 								log.log(Level.FINEST, "Handshaking server...");
313 								serverInfo = IOUtils.handshake(sChannel, _clientInfo, true, _connectionTimeout);
314 								if (serverInfo == null)
315 								{
316 									throw new IOException("Connection timeout (" + _connectionTimeout
317 											+ "ms) reached while waiting for Handshake completing.");
318 								}
319 							}
320 
321 							if (serverInfo.hasNonBlockingReadWrite())
322 							{
323 								send(serverInfo, selKey, obj);
324 								sChannel.register(selector, SelectionKey.OP_READ);
325 							}
326 							else
327 							{
328 								selKey.cancel();
329 								sChannel.configureBlocking(true);
330 								result = processBlocked(serverInfo, sChannel, obj);
331 								return result;
332 							}
333 						}
334 						if (selKey.isReadable())
335 						{
336 							result = receive(serverInfo, selKey);
337 							return result;
338 						}
339 					}
340 					catch (IncompleteIOException ioe)
341 					{
342 						selKey.cancel();
343 						selector.selectNow();
344 						sChannel.register(selector, ioe.getSelectionInterest(), ioe.getIOBuffer());
345 					}
346 					catch (IOException e)
347 					{
348 						selKey.cancel();
349 						IOUtils.closeQuite(sChannel);
350 						// rethrow origin exception to inform the caller
351 						// without the hassle of nested exceptions
352 						throw e;
353 					}
354 				}
355 			}
356 		}
357 		finally
358 		{
359 			this._requestInProgress = false;
360 			IOUtils.closeQuite(selector);
361 
362 			if (!this._clientInfo.isPersistent() || !serverInfo.isPersistent())
363 			{
364 				IOUtils.closeQuite(sChannel);
365 			}
366 		}
367 
368 		if (result instanceof Throwable)
369 		{
370 			throw new RemoteException("The server did return an Exception while handling your request!",
371 					(Throwable) result);
372 		}
373 
374 		return result;
375 	}
376 
377 	/***
378 	 * Closes an existing persistent connection to the corresponding EJOE server. Invoking this method when
379 	 * non-persistent connections are used has no effect.
380 	 */
381 	public void close()
382 	{
383 		if (this._channel != null)
384 		{
385 			IOUtils.closeQuite(this._channel);
386 			this._channel = null;
387 		}
388 	}
389 
390 	/***
391 	 * Opens a new socket connection to the EJServer
392 	 *
393 	 * @return
394 	 * @throws IOException
395 	 */
396 	private SocketChannel openSocketChannel() throws IOException
397 	{
398 		SocketChannel sChannel = this._channel;
399 
400 		if (sChannel == null || !sChannel.isOpen() || !this._clientInfo.isPersistent())
401 		{
402 			sChannel = SocketChannel.open();
403 			sChannel.configureBlocking(false);
404 			sChannel.connect(new InetSocketAddress(_host, _port));
405 		}
406 
407 		return sChannel;
408 	}
409 
410 	/***
411 	 * Non-blocking request sending.
412 	 *
413 	 * @param serverInfo ConnectionHeader received from the EJServer
414 	 * @param selKey
415 	 * @param obj the request
416 	 * @throws IOException
417 	 */
418 	private void send(ConnectionHeader serverInfo, SelectionKey selKey, Object obj) throws IOException
419 	{
420 		SocketChannel channel = (SocketChannel) selKey.channel();
421 		ByteBuffer dataBuf = (ByteBuffer) selKey.attachment();
422 		ByteBufferOutputStream out = null;
423 
424 		try
425 		{
426 			if (dataBuf == null)
427 			{
428 				out = new ByteBufferOutputStream();
429 				serialize(out, obj, serverInfo);
430 				dataBuf = out.getBackingBuffer();
431 				dataBuf.flip();
432 				log.log(Level.FINE, "Going to send request...");
433 				IOUtils.writeHeader(serverInfo, dataBuf);
434 			}
435 
436 			IOUtils.nonBlockingWrite(channel, dataBuf);
437 			log.log(Level.FINE, "Request sent.");
438 		}
439 		finally
440 		{
441 			IOUtils.closeQuite(out);
442 		}
443 	}
444 
445 	/***
446 	 * Non-blocking response reading.
447 	 *
448 	 * @param serverInfo ConnectionHeader received from the EJServer
449 	 * @param selKey
450 	 * @return server response received by the corresponding EJServer
451 	 * @throws IOException
452 	 */
453 	private Object receive(ConnectionHeader serverInfo, SelectionKey selKey) throws IOException
454 	{
455 		Object result = null;
456 		SocketChannel channel = (SocketChannel) selKey.channel();
457 		ByteBuffer dataBuf = (ByteBuffer) selKey.attachment();
458 		ByteBufferInputStream in = null;
459 
460 		try
461 		{
462 			if (dataBuf == null)
463 			{
464 				int length = IOUtils.readHeader(serverInfo);
465 				// server signals a null result?
466 				if (length == 0)
467 				{
468 					return null;
469 				}
470 				dataBuf = ByteBuffer.allocateDirect(length);
471 				dataBuf.limit(length);
472 				log.log(Level.FINE, "Going to read server response with length: " + length);
473 			}
474 
475 			IOUtils.nonBlockingRead(channel, dataBuf);
476 			dataBuf.flip();
477 			log.log(Level.FINE, "Response read. Now calling (de)serialize adapter to convert response back to object.");
478 
479 			in = new ByteBufferInputStream(dataBuf);
480 			result = deserialize(in, serverInfo);
481 
482 			log.log(Level.FINE, "Server response successfully converted to object.");
483 
484 			selKey.cancel();
485 		}
486 		finally
487 		{
488 			IOUtils.closeQuite(in);
489 		}
490 
491 		return result;
492 	}
493 
494 	/***
495 	 * Combined blocking request sending and response reading.
496 	 *
497 	 * @param serverInfo
498 	 *            ConnectionHeader received from the EJServer
499 	 * @param channel
500 	 *            the connected SocketChannel
501 	 * @param obj
502 	 *            the request
503 	 * @return server response received by the corresponding EJServer
504 	 * @throws IOException
505 	 */
506 	private Object processBlocked(ConnectionHeader serverInfo, SocketChannel channel, Object obj) throws IOException
507 	{
508 		Object result = null;
509 
510 		if (channel != null)
511 		{
512 			serialize(Channels.newOutputStream(channel), obj, serverInfo);
513 			result = deserialize(Channels.newInputStream(channel), serverInfo);
514 		}
515 
516 		return result;
517 	}
518 
519 	/***
520 	 * @param out
521 	 * @param obj
522 	 * @param serverInfo
523 	 * @throws IOException
524 	 */
525 	private void serialize(OutputStream out, Object obj, ConnectionHeader serverInfo) throws IOException
526 	{
527 		boolean compressed = this._clientInfo.hasCompression() && serverInfo.hasCompression();
528 		IOUtils.adapterSerialize(this._adapter, out, obj, compressed, this._clientInfo.getCompressionLevel());
529 	}
530 
531 	/***
532 	 * @param in
533 	 * @param serverInfo
534 	 * @return
535 	 * @throws IOException
536 	 */
537 	private Object deserialize(InputStream in, ConnectionHeader serverInfo) throws IOException
538 	{
539 		boolean compressed = this._clientInfo.hasCompression() && serverInfo.hasCompression();
540 		return IOUtils.adapterDeserialize(this._adapter, in, compressed);
541 	}
542 
543 	/***
544 	 * Reads the settings for this client from a given Properties instance
545 	 *
546 	 * @param props
547 	 */
548 	private void loadProperties(Properties props)
549 	{
550 		String clazz = null;
551 
552 		try
553 		{
554 			clazz = props.getProperty("ejoe.adapter", ObjectStreamAdapter.class.getName());
555 			_adapter = (SerializeAdapter) Class.forName(clazz).newInstance();
556 			this._clientInfo.setAdapterName(clazz);
557 			_host = props.getProperty("ejoe.host", "127.0.0.1");
558 			_port = Integer.parseInt(props.getProperty("ejoe.port", String.valueOf(EJConstants.EJOE_PORT)));
559 			_connectionTimeout = Integer.parseInt(props.getProperty("ejoe.connectionTimeout", String
560 					.valueOf(EJConstants.EJOE_CONNECTION_TIMEOUT)));
561 			this._clientInfo.setCompression(Boolean.valueOf(props.getProperty("ejoe.compression", "false"))
562 					.booleanValue());
563 			this._clientInfo.setPersistent(Boolean.valueOf(props.getProperty("ejoe.persistentConnection", "true"))
564 					.booleanValue());
565 
566 			if (Boolean.valueOf(props.getProperty("ejoe.remoteClassloader", "false")).booleanValue())
567 			{
568 				_classLoaderPort = Integer.parseInt(props.getProperty("ejoe.classLoaderPort", String
569 						.valueOf(EJConstants.EJOE_CLASSLOADER_PORT)));
570 				initClassLoader();
571 			}
572 		}
573 		catch (InstantiationException e)
574 		{
575 			log.log(Level.SEVERE, "Can't instantiate class " + clazz);
576 			throw new RuntimeException(e);
577 		}
578 		catch (IllegalAccessException e)
579 		{
580 			log.log(Level.SEVERE, "Can't access configured class " + clazz);
581 			throw new RuntimeException(e);
582 		}
583 		catch (ClassNotFoundException e)
584 		{
585 			log.log(Level.SEVERE, "Can't locate configured class " + clazz);
586 			throw new RuntimeException(e);
587 		}
588 	}
589 
590 	/***
591 	 * Initializes a EJClassloader and sets it as the current context classloader. Also notifies the used
592 	 * SerializeAdapter to recognize the classloader change.
593 	 */
594 	private void initClassLoader()
595 	{
596 		EJClassLoader ejcl = new EJClassLoader(Thread.currentThread().getContextClassLoader(), _host, _classLoaderPort);
597 		Thread.currentThread().setContextClassLoader(ejcl);
598 		this._adapter.handleClassLoaderChange(ejcl);
599 	}
600 }