View Javadoc

1   /**********************************************************************
2    * EJServer.java
3    * created on 06.08.2004 by netseeker
4    * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/EJServer.java,v $
5    * $Date: 2006/02/04 14:16:35 $
6    * $Revision: 1.43 $
7    *********************************************************************/
8   
9   package de.netseeker.ejoe;
10  
11  import java.io.IOException;
12  import java.net.InetSocketAddress;
13  import java.nio.channels.ServerSocketChannel;
14  import java.util.logging.Level;
15  import java.util.logging.Logger;
16  import java.util.zip.Deflater;
17  
18  import de.netseeker.ejoe.handler.ClassHandler;
19  import de.netseeker.ejoe.handler.ServerHandler;
20  import de.netseeker.ejoe.io.IOUtils;
21  
22  /***
23   * This is the server component of EJOE. EJOE is a request object broker in it's natural meaning. You have to use this
24   * component if you want retrieve and send data from/to EJOE clients. EJOE offers three things - and ONLY these three
25   * things for you:
26   * <ol>
27   * <li>a multithreaded, high performance io server</li>
28   * <li>(de)serializing input of input objects send by clients and return objects provided by YOUR business logic</li>
29   * <li>a simple, clean and unique interface to integrate a object request broker into your application</li>
30   * </ol>
31   *
32   * @author netseeker aka Michael Manske
33   */
34  public class EJServer
35  {
36  	public static final int				CON_USE_COMPRESSION				= 1;
37  
38  	public static final int				CON_USE_NONBLOCKING_IO			= 2;
39  
40  	public static final int				CON_USE_PERSISTENT				= 3;
41  
42  	private static final Logger			logger							= Logger.getLogger(EJServer.class.getName());
43  
44  	private ConnectionAcceptor			_acceptor, _classAcceptor;
45  
46  	private ConnectionProcessorManager	_processor, _classProcessor;
47  
48  	private ServerSocketChannel			_channel, _classLoaderChannel;
49  
50  	private final ServerHandler			_handler;
51  
52  	private int							_port, _classLoaderPort;
53  
54  	private int							_maxReadProcessors				= EJConstants.EJOE_MAX_READPROCESSORS,
55  			_maxWriteProcessors = EJConstants.EJOE_MAX_WRITEPROCESSORS;
56  
57  	private int							_compressionLevel				= Deflater.DEFAULT_COMPRESSION;
58  
59  	private boolean						_classServerEnabled				= false;
60  
61  	private boolean						_serverRunning					= false;
62  
63  	private boolean						_classServerRunning				= false;
64  
65  	private boolean						_useThreadedProcessors			= false;
66  
67  	private boolean						_useCompression					= false;
68  
69  	private boolean						_enableNewIO					= false;
70  
71  	private boolean						_enablePersistentConnections	= true;
72  
73  	/***
74  	 * Creates an instance of the EJOE server component pre-configured whith a default value for the used port
75  	 *
76  	 * @param handler
77  	 *            an implementation of de.netseeker.ejoe.ServerHandler
78  	 */
79  	public EJServer(final ServerHandler handler)
80  	{
81  		this(handler, EJConstants.EJOE_PORT);
82  	}
83  
84  	/***
85  	 * Creates an instance of the EJOE server component
86  	 *
87  	 * @param handler
88  	 *            an implementation of de.netseeker.ejoe.ServerHandler
89  	 * @param port
90  	 *            the port EJOE should listen to
91  	 */
92  	public EJServer(final ServerHandler handler, int port)
93  	{
94  		this._handler = handler;
95  		this._port = port;
96  	}
97  
98  	/***
99  	 * Creates an instance of the EJOE server component pre-configured whith a default value for the used port and using
100 	 * the given connection options.
101 	 *
102 	 * @param options
103 	 *            the options to use
104 	 * @param handler
105 	 *            an implementation of de.netseeker.ejoe.ServerHandler
106 	 * @see #CON_USE_COMPRESSION
107 	 * @see #CON_USE_NONBLOCKING_IO
108 	 * @see #CON_USE_PERSISTENT
109 	 */
110 	public EJServer(int options, final ServerHandler handler)
111 	{
112 		this(handler);
113 		setOptions(options);
114 	}
115 
116 	/***
117 	 * Creates an instance of the EJOE server component pre-configured whith the default (de)serializing mechanism and
118 	 * using the given connection options.
119 	 *
120 	 * @param options
121 	 *            the options to use
122 	 * @param handler
123 	 *            an implementation of de.netseeker.ejoe.ServerHandler
124 	 * @param port
125 	 *            the port EJOE should listen to
126 	 * @see #CON_USE_COMPRESSION
127 	 * @see #CON_USE_NONBLOCKING_IO
128 	 * @see #CON_USE_PERSISTENT
129 	 */
130 	public EJServer(int options, final ServerHandler handler, int port)
131 	{
132 		this(handler, port);
133 		setOptions(options);
134 	}
135 
136 	/***
137 	 * Sets the amount of threads used for processing read operations on accepted connections. This will indirectly
138 	 * control the amount of concurrently processed io read operations and adapter calls.
139 	 *
140 	 * @param maxProcessors
141 	 *            new amount of threads used for processing accepted connections
142 	 * @see #setMaxWriteProcessors(int)
143 	 */
144 	public void setMaxReadProcessors(int maxProcessors)
145 	{
146 		if (_processor == null || !_acceptor.isAlive())
147 		{
148 			this._maxReadProcessors = maxProcessors;
149 		}
150 		else
151 		{
152 			throw new IllegalStateException(
153 					"The value for the maximum of used read processor threads can't be changed while EJOE is already running!");
154 		}
155 	}
156 
157 	/***
158 	 * Sets the amount of threads used for processing write operations. This will directly control the amount of
159 	 * concurrently processed io socket write operations.
160 	 *
161 	 * @param maxProcessors
162 	 *            new amount of threads used for processing io socket write operations
163 	 * @see #setMaxReadProcessors(int)
164 	 */
165 	public void setMaxWriteProcessors(int maxProcessors)
166 	{
167 		if (_processor == null || !_acceptor.isAlive())
168 		{
169 			this._maxWriteProcessors = maxProcessors;
170 		}
171 		else
172 		{
173 			throw new IllegalStateException(
174 					"The value for the maximum of used write processor threads can't be changed while EJOE is already running!");
175 		}
176 	}
177 
178 	/***
179 	 * Enables the threaded processor feature. This means that for each incoming connection a new server thread will be
180 	 * started which will be responsible for processing read operations as well as invocation of the used ServerHandler.
181 	 * Unlike the default behavior (when a limited threadpool is used) the use of threaded processors will block as long
182 	 * as the amount of open processor threads is greater than or equal the max. processor limit. Enabling this feature
183 	 * *can* reduce cpu and memory consumption, especially if the server handler tasks are long time tasks or very
184 	 * resource hungry. It's strongly recommended to try different settings for the maximum of used processor threads to
185 	 * achieve best possible performance when enabling threaded processors.
186 	 *
187 	 * @see #setMaxProcessors(int)
188 	 */
189 	public void enableThreadedProcessorUsage(boolean enable)
190 	{
191 		if (_processor == null)
192 		{
193 			this._useThreadedProcessors = enable;
194 		}
195 		else
196 		{
197 			throw new IllegalStateException(
198 					"Processor threading strategy can't be changed while EJOE is already running!");
199 		}
200 	}
201 
202 	/***
203 	 * Enables or disables the usage of compressing/decompressing for outgoing/incoming data. Enabling compression *can*
204 	 * result in signifcantly better throughput especially when using a text based SerializeAdapter in confunction with
205 	 * large data objects.
206 	 *
207 	 * @see de.netseeker.ejoe.adapter.SerializeAdapter
208 	 */
209 	public void enableCompression(boolean enable)
210 	{
211 		if (_processor == null && _acceptor == null)
212 		{
213 			this._useCompression = enable;
214 		}
215 		else
216 		{
217 			throw new IllegalStateException("Compression setting can't be changed while EJOE is already running!");
218 		}
219 	}
220 
221 	/***
222 	 * Enables the usage of compressing/decompressing for outgoing/incoming data with the given compression level.
223 	 * Enabling compression *can* result in signifcantly better throughput especially when using a text based
224 	 * SerializeAdapter in confunction with large data objects.
225 	 *
226 	 * @param compressionLevel
227 	 *            the level of compression to use, must be in range of 0-9
228 	 * @see de.netseeker.ejoe.adapter.SerializeAdapter
229 	 */
230 	public void enableCompression(int compressionLevel)
231 	{
232 		if (_processor == null && _acceptor == null)
233 		{
234 			if (compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION)
235 			{
236 				throw new IllegalArgumentException("Compressionlevel must be in the range of "
237 						+ Deflater.NO_COMPRESSION + ":" + Deflater.BEST_COMPRESSION);
238 			}
239 			this._useCompression = true;
240 			this._compressionLevel = compressionLevel;
241 		}
242 		else
243 		{
244 			throw new IllegalStateException("Compression setting can't be changed while EJOE is already running!");
245 		}
246 	}
247 
248 	/***
249 	 * Enables/disables new style non-blocking IO for read and write operations. Sometimes this can be significantly
250 	 * faster than using blocking io. Be aware that connection acception as well as EJOES internal connection
251 	 * handshaking will use NIO anyway to ensure high latency.
252 	 */
253 	public void enableNonBlockingIO(boolean enable)
254 	{
255 		if (_processor == null && _acceptor == null)
256 		{
257 			this._enableNewIO = enable;
258 		}
259 		else
260 		{
261 			throw new IllegalStateException(
262 					"Blocking/NonBlocking IO behavior can't be changed while EJOE is already running!");
263 		}
264 	}
265 
266 	/***
267 	 * Enables/disables support for persistents client connections. Usage of persistent connections is more performant
268 	 * in most cases because the client doesn't need to open a new connection on each request. The drawback of
269 	 * persistent connections can be a higher server load because the server may have to handle a lot of "idle" client
270 	 * connections.
271 	 *
272 	 * @param enable
273 	 */
274 	public void enablePersistentConnections(boolean enable)
275 	{
276 		if (_processor == null && _acceptor == null)
277 		{
278 			this._enablePersistentConnections = enable;
279 		}
280 		else
281 		{
282 			throw new IllegalStateException(
283 					"Usage of persistent connections can't be changed while EJOE is already running!");
284 		}
285 	}
286 
287 	/***
288 	 * Enables support for remote classloading via a second EJOE instance using a default port and a special
289 	 * ServerHandler for class loading requests.
290 	 */
291 	public void enableRemoteClassLoading(boolean enable)
292 	{
293 		if (_classProcessor == null && _classAcceptor == null)
294 		{
295 			if (enable)
296 			{
297 				enableRemoteClassLoading(EJConstants.EJOE_CLASSLOADER_PORT);
298 			}
299 			else
300 			{
301 				this._classServerEnabled = false;
302 			}
303 		}
304 		else
305 		{
306 			throw new IllegalStateException(
307 					"Remote classloading can't be changed while EJOE ClassLoader server is already running!");
308 		}
309 	}
310 
311 	/***
312 	 * Enables support for remote classloading via a second EJOE instance using a special ServerHandler for class
313 	 * loading requests.
314 	 *
315 	 * @param port
316 	 *            The port which the class loader server should use
317 	 */
318 	public void enableRemoteClassLoading(int port)
319 	{
320 		if (_classProcessor == null && _classAcceptor == null)
321 		{
322 			this._classLoaderPort = port;
323 			this._classServerEnabled = true;
324 		}
325 		else
326 		{
327 			throw new IllegalStateException(
328 					"Remote classloading can't be changed while EJOE ClassLoader server is already running!");
329 		}
330 	}
331 
332 	/***
333 	 * (Re)Starts the main server as well as the class loader server (if it's configured)
334 	 */
335 	public void start() throws IOException
336 	{
337 		logger.log(Level.INFO, "Starting EJOE server...");
338 
339 		if (this._serverRunning)
340 		{
341 			logger.log(Level.WARNING, "EJOE server already running - will try a restart.");
342 			stop();
343 		}
344 
345 		if (this._classServerEnabled && !this._classServerRunning)
346 		{
347 			startServer(true);
348 			logger.log(Level.INFO, "Classloader server started successfully.");
349 		}
350 
351 		startServer(false);
352 		logger.log(Level.INFO, "EJOE server started successfully.");
353 	}
354 
355 	/***
356 	 * Stops the main server as well as the class loader server (if it's running)
357 	 */
358 	public void stop()
359 	{
360 		logger.log(Level.INFO, "Stopping EJOE server...");
361 
362 		if (this._classServerRunning)
363 		{
364 			this._classAcceptor.interrupt();
365 			this._classProcessor.interrupt();
366 			this._classServerRunning = false;
367 			IOUtils.closeQuite(this._classLoaderChannel);
368 			this._classAcceptor = null;
369 			this._classProcessor = null;
370 			logger.log(Level.INFO, "Classloader server stopped.");
371 		}
372 
373 		if (this._serverRunning)
374 		{
375 			this._acceptor.interrupt();
376 			this._processor.interrupt();
377 			this._serverRunning = false;
378 			IOUtils.closeQuite(this._channel);
379 			this._acceptor = null;
380 			this._processor = null;
381 			logger.log(Level.INFO, "EJOE server stopped.");
382 		}
383 	}
384 
385 	/***
386 	 * @param classloader
387 	 * @throws IOException
388 	 */
389 	private void startServer(boolean classloader) throws IOException
390 	{
391 		boolean running = !classloader ? this._serverRunning : this._classServerRunning;
392 
393 		if (!running)
394 		{
395 			int port = EJConstants.EJOE_PORT;
396 			int readProcessCount = EJConstants.EJOE_MAX_READPROCESSORS;
397 			int writeProcessCount = EJConstants.EJOE_MAX_READPROCESSORS;
398 			ServerHandler handler = null;
399 			ConnectionAcceptor acceptor = null;
400 			ConnectionProcessorManager processor = null;
401 			ServerSocketChannel channel = null;
402 
403 			if (!classloader)
404 			{
405 				port = this._port;
406 				readProcessCount = this._maxReadProcessors;
407 				writeProcessCount = this._maxWriteProcessors;
408 				handler = this._handler;
409 			}
410 			else
411 			{
412 				port = this._classLoaderPort;
413 				handler = new ClassHandler();
414 			}
415 
416 			try
417 			{
418 				ConnectionHeader header = new ConnectionHeader();
419 				header.setCompression(this._useCompression);
420 				header.setNonBlockingReadWrite(this._enableNewIO);
421 				header.setPersistent(this._enablePersistentConnections);
422 				channel = ServerSocketChannel.open();
423 				channel.configureBlocking(false);
424 				channel.socket().setReuseAddress(true);
425 				channel.socket().bind(new InetSocketAddress(port));
426 				header.setChannel(channel);
427 				processor = new ConnectionProcessorManager(header, handler, readProcessCount, writeProcessCount,
428 						this._useThreadedProcessors);
429 				acceptor = new ConnectionAcceptor(channel, processor);
430 			}
431 			catch (IOException e)
432 			{
433 				logger.log(Level.SEVERE, "!!! IOException occured !!! ", e);
434 				IOUtils.closeQuite(channel);
435 				throw (e);
436 			}
437 
438 			//processor.setDaemon(true);
439 			processor.start();
440 			//acceptor.setDaemon(true);
441 			acceptor.start();
442 
443 			if (!classloader)
444 			{
445 				this._serverRunning = true;
446 				this._acceptor = acceptor;
447 				this._processor = processor;
448 				this._channel = channel;
449 				if (logger.isLoggable(Level.INFO))
450 				{
451 					logger.log(Level.INFO, "EJOE server listening on: " + channel.socket().getLocalSocketAddress());
452 					logger.log(Level.INFO, "Using non-blocking IO: " + this._enableNewIO);
453 					logger.log(Level.INFO, "Allowing persistent client connections: "
454 							+ this._enablePersistentConnections);
455 					logger.log(Level.INFO, "Using compression: " + this._useCompression);
456 				}
457 			}
458 			else
459 			{
460 				this._classServerRunning = true;
461 				this._classAcceptor = acceptor;
462 				this._classProcessor = processor;
463 				this._classLoaderChannel = channel;
464 				logger.log(Level.INFO, "EJOE classloader server listening on: "
465 						+ channel.socket().getLocalSocketAddress());
466 			}
467 		}
468 	}
469 
470 	private void setOptions(int options)
471 	{
472 		enableNonBlockingIO((options & CON_USE_NONBLOCKING_IO) == CON_USE_NONBLOCKING_IO);
473 		enableCompression((options & CON_USE_COMPRESSION) == CON_USE_COMPRESSION);
474 		enablePersistentConnections((options & CON_USE_PERSISTENT) == CON_USE_PERSISTENT);
475 	}
476 }