1 /**********************************************************************
2 * EJServer.java
3 * created on 06.08.2004 by netseeker
4 * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/EJServer.java,v $
5 * $Date: 2006/02/04 14:16:35 $
6 * $Revision: 1.43 $
7 *********************************************************************/
8
9 package de.netseeker.ejoe;
10
11 import java.io.IOException;
12 import java.net.InetSocketAddress;
13 import java.nio.channels.ServerSocketChannel;
14 import java.util.logging.Level;
15 import java.util.logging.Logger;
16 import java.util.zip.Deflater;
17
18 import de.netseeker.ejoe.handler.ClassHandler;
19 import de.netseeker.ejoe.handler.ServerHandler;
20 import de.netseeker.ejoe.io.IOUtils;
21
22 /***
23 * This is the server component of EJOE. EJOE is a request object broker in it's natural meaning. You have to use this
24 * component if you want retrieve and send data from/to EJOE clients. EJOE offers three things - and ONLY these three
25 * things for you:
26 * <ol>
27 * <li>a multithreaded, high performance io server</li>
28 * <li>(de)serializing input of input objects send by clients and return objects provided by YOUR business logic</li>
29 * <li>a simple, clean and unique interface to integrate a object request broker into your application</li>
30 * </ol>
31 *
32 * @author netseeker aka Michael Manske
33 */
34 public class EJServer
35 {
36 public static final int CON_USE_COMPRESSION = 1;
37
38 public static final int CON_USE_NONBLOCKING_IO = 2;
39
40 public static final int CON_USE_PERSISTENT = 3;
41
42 private static final Logger logger = Logger.getLogger(EJServer.class.getName());
43
44 private ConnectionAcceptor _acceptor, _classAcceptor;
45
46 private ConnectionProcessorManager _processor, _classProcessor;
47
48 private ServerSocketChannel _channel, _classLoaderChannel;
49
50 private final ServerHandler _handler;
51
52 private int _port, _classLoaderPort;
53
54 private int _maxReadProcessors = EJConstants.EJOE_MAX_READPROCESSORS,
55 _maxWriteProcessors = EJConstants.EJOE_MAX_WRITEPROCESSORS;
56
57 private int _compressionLevel = Deflater.DEFAULT_COMPRESSION;
58
59 private boolean _classServerEnabled = false;
60
61 private boolean _serverRunning = false;
62
63 private boolean _classServerRunning = false;
64
65 private boolean _useThreadedProcessors = false;
66
67 private boolean _useCompression = false;
68
69 private boolean _enableNewIO = false;
70
71 private boolean _enablePersistentConnections = true;
72
73 /***
74 * Creates an instance of the EJOE server component pre-configured whith a default value for the used port
75 *
76 * @param handler
77 * an implementation of de.netseeker.ejoe.ServerHandler
78 */
79 public EJServer(final ServerHandler handler)
80 {
81 this(handler, EJConstants.EJOE_PORT);
82 }
83
84 /***
85 * Creates an instance of the EJOE server component
86 *
87 * @param handler
88 * an implementation of de.netseeker.ejoe.ServerHandler
89 * @param port
90 * the port EJOE should listen to
91 */
92 public EJServer(final ServerHandler handler, int port)
93 {
94 this._handler = handler;
95 this._port = port;
96 }
97
98 /***
99 * Creates an instance of the EJOE server component pre-configured whith a default value for the used port and using
100 * the given connection options.
101 *
102 * @param options
103 * the options to use
104 * @param handler
105 * an implementation of de.netseeker.ejoe.ServerHandler
106 * @see #CON_USE_COMPRESSION
107 * @see #CON_USE_NONBLOCKING_IO
108 * @see #CON_USE_PERSISTENT
109 */
110 public EJServer(int options, final ServerHandler handler)
111 {
112 this(handler);
113 setOptions(options);
114 }
115
116 /***
117 * Creates an instance of the EJOE server component pre-configured whith the default (de)serializing mechanism and
118 * using the given connection options.
119 *
120 * @param options
121 * the options to use
122 * @param handler
123 * an implementation of de.netseeker.ejoe.ServerHandler
124 * @param port
125 * the port EJOE should listen to
126 * @see #CON_USE_COMPRESSION
127 * @see #CON_USE_NONBLOCKING_IO
128 * @see #CON_USE_PERSISTENT
129 */
130 public EJServer(int options, final ServerHandler handler, int port)
131 {
132 this(handler, port);
133 setOptions(options);
134 }
135
136 /***
137 * Sets the amount of threads used for processing read operations on accepted connections. This will indirectly
138 * control the amount of concurrently processed io read operations and adapter calls.
139 *
140 * @param maxProcessors
141 * new amount of threads used for processing accepted connections
142 * @see #setMaxWriteProcessors(int)
143 */
144 public void setMaxReadProcessors(int maxProcessors)
145 {
146 if (_processor == null || !_acceptor.isAlive())
147 {
148 this._maxReadProcessors = maxProcessors;
149 }
150 else
151 {
152 throw new IllegalStateException(
153 "The value for the maximum of used read processor threads can't be changed while EJOE is already running!");
154 }
155 }
156
157 /***
158 * Sets the amount of threads used for processing write operations. This will directly control the amount of
159 * concurrently processed io socket write operations.
160 *
161 * @param maxProcessors
162 * new amount of threads used for processing io socket write operations
163 * @see #setMaxReadProcessors(int)
164 */
165 public void setMaxWriteProcessors(int maxProcessors)
166 {
167 if (_processor == null || !_acceptor.isAlive())
168 {
169 this._maxWriteProcessors = maxProcessors;
170 }
171 else
172 {
173 throw new IllegalStateException(
174 "The value for the maximum of used write processor threads can't be changed while EJOE is already running!");
175 }
176 }
177
178 /***
179 * Enables the threaded processor feature. This means that for each incoming connection a new server thread will be
180 * started which will be responsible for processing read operations as well as invocation of the used ServerHandler.
181 * Unlike the default behavior (when a limited threadpool is used) the use of threaded processors will block as long
182 * as the amount of open processor threads is greater than or equal the max. processor limit. Enabling this feature
183 * *can* reduce cpu and memory consumption, especially if the server handler tasks are long time tasks or very
184 * resource hungry. It's strongly recommended to try different settings for the maximum of used processor threads to
185 * achieve best possible performance when enabling threaded processors.
186 *
187 * @see #setMaxProcessors(int)
188 */
189 public void enableThreadedProcessorUsage(boolean enable)
190 {
191 if (_processor == null)
192 {
193 this._useThreadedProcessors = enable;
194 }
195 else
196 {
197 throw new IllegalStateException(
198 "Processor threading strategy can't be changed while EJOE is already running!");
199 }
200 }
201
202 /***
203 * Enables or disables the usage of compressing/decompressing for outgoing/incoming data. Enabling compression *can*
204 * result in signifcantly better throughput especially when using a text based SerializeAdapter in confunction with
205 * large data objects.
206 *
207 * @see de.netseeker.ejoe.adapter.SerializeAdapter
208 */
209 public void enableCompression(boolean enable)
210 {
211 if (_processor == null && _acceptor == null)
212 {
213 this._useCompression = enable;
214 }
215 else
216 {
217 throw new IllegalStateException("Compression setting can't be changed while EJOE is already running!");
218 }
219 }
220
221 /***
222 * Enables the usage of compressing/decompressing for outgoing/incoming data with the given compression level.
223 * Enabling compression *can* result in signifcantly better throughput especially when using a text based
224 * SerializeAdapter in confunction with large data objects.
225 *
226 * @param compressionLevel
227 * the level of compression to use, must be in range of 0-9
228 * @see de.netseeker.ejoe.adapter.SerializeAdapter
229 */
230 public void enableCompression(int compressionLevel)
231 {
232 if (_processor == null && _acceptor == null)
233 {
234 if (compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION)
235 {
236 throw new IllegalArgumentException("Compressionlevel must be in the range of "
237 + Deflater.NO_COMPRESSION + ":" + Deflater.BEST_COMPRESSION);
238 }
239 this._useCompression = true;
240 this._compressionLevel = compressionLevel;
241 }
242 else
243 {
244 throw new IllegalStateException("Compression setting can't be changed while EJOE is already running!");
245 }
246 }
247
248 /***
249 * Enables/disables new style non-blocking IO for read and write operations. Sometimes this can be significantly
250 * faster than using blocking io. Be aware that connection acception as well as EJOES internal connection
251 * handshaking will use NIO anyway to ensure high latency.
252 */
253 public void enableNonBlockingIO(boolean enable)
254 {
255 if (_processor == null && _acceptor == null)
256 {
257 this._enableNewIO = enable;
258 }
259 else
260 {
261 throw new IllegalStateException(
262 "Blocking/NonBlocking IO behavior can't be changed while EJOE is already running!");
263 }
264 }
265
266 /***
267 * Enables/disables support for persistents client connections. Usage of persistent connections is more performant
268 * in most cases because the client doesn't need to open a new connection on each request. The drawback of
269 * persistent connections can be a higher server load because the server may have to handle a lot of "idle" client
270 * connections.
271 *
272 * @param enable
273 */
274 public void enablePersistentConnections(boolean enable)
275 {
276 if (_processor == null && _acceptor == null)
277 {
278 this._enablePersistentConnections = enable;
279 }
280 else
281 {
282 throw new IllegalStateException(
283 "Usage of persistent connections can't be changed while EJOE is already running!");
284 }
285 }
286
287 /***
288 * Enables support for remote classloading via a second EJOE instance using a default port and a special
289 * ServerHandler for class loading requests.
290 */
291 public void enableRemoteClassLoading(boolean enable)
292 {
293 if (_classProcessor == null && _classAcceptor == null)
294 {
295 if (enable)
296 {
297 enableRemoteClassLoading(EJConstants.EJOE_CLASSLOADER_PORT);
298 }
299 else
300 {
301 this._classServerEnabled = false;
302 }
303 }
304 else
305 {
306 throw new IllegalStateException(
307 "Remote classloading can't be changed while EJOE ClassLoader server is already running!");
308 }
309 }
310
311 /***
312 * Enables support for remote classloading via a second EJOE instance using a special ServerHandler for class
313 * loading requests.
314 *
315 * @param port
316 * The port which the class loader server should use
317 */
318 public void enableRemoteClassLoading(int port)
319 {
320 if (_classProcessor == null && _classAcceptor == null)
321 {
322 this._classLoaderPort = port;
323 this._classServerEnabled = true;
324 }
325 else
326 {
327 throw new IllegalStateException(
328 "Remote classloading can't be changed while EJOE ClassLoader server is already running!");
329 }
330 }
331
332 /***
333 * (Re)Starts the main server as well as the class loader server (if it's configured)
334 */
335 public void start() throws IOException
336 {
337 logger.log(Level.INFO, "Starting EJOE server...");
338
339 if (this._serverRunning)
340 {
341 logger.log(Level.WARNING, "EJOE server already running - will try a restart.");
342 stop();
343 }
344
345 if (this._classServerEnabled && !this._classServerRunning)
346 {
347 startServer(true);
348 logger.log(Level.INFO, "Classloader server started successfully.");
349 }
350
351 startServer(false);
352 logger.log(Level.INFO, "EJOE server started successfully.");
353 }
354
355 /***
356 * Stops the main server as well as the class loader server (if it's running)
357 */
358 public void stop()
359 {
360 logger.log(Level.INFO, "Stopping EJOE server...");
361
362 if (this._classServerRunning)
363 {
364 this._classAcceptor.interrupt();
365 this._classProcessor.interrupt();
366 this._classServerRunning = false;
367 IOUtils.closeQuite(this._classLoaderChannel);
368 this._classAcceptor = null;
369 this._classProcessor = null;
370 logger.log(Level.INFO, "Classloader server stopped.");
371 }
372
373 if (this._serverRunning)
374 {
375 this._acceptor.interrupt();
376 this._processor.interrupt();
377 this._serverRunning = false;
378 IOUtils.closeQuite(this._channel);
379 this._acceptor = null;
380 this._processor = null;
381 logger.log(Level.INFO, "EJOE server stopped.");
382 }
383 }
384
385 /***
386 * @param classloader
387 * @throws IOException
388 */
389 private void startServer(boolean classloader) throws IOException
390 {
391 boolean running = !classloader ? this._serverRunning : this._classServerRunning;
392
393 if (!running)
394 {
395 int port = EJConstants.EJOE_PORT;
396 int readProcessCount = EJConstants.EJOE_MAX_READPROCESSORS;
397 int writeProcessCount = EJConstants.EJOE_MAX_READPROCESSORS;
398 ServerHandler handler = null;
399 ConnectionAcceptor acceptor = null;
400 ConnectionProcessorManager processor = null;
401 ServerSocketChannel channel = null;
402
403 if (!classloader)
404 {
405 port = this._port;
406 readProcessCount = this._maxReadProcessors;
407 writeProcessCount = this._maxWriteProcessors;
408 handler = this._handler;
409 }
410 else
411 {
412 port = this._classLoaderPort;
413 handler = new ClassHandler();
414 }
415
416 try
417 {
418 ConnectionHeader header = new ConnectionHeader();
419 header.setCompression(this._useCompression);
420 header.setNonBlockingReadWrite(this._enableNewIO);
421 header.setPersistent(this._enablePersistentConnections);
422 channel = ServerSocketChannel.open();
423 channel.configureBlocking(false);
424 channel.socket().setReuseAddress(true);
425 channel.socket().bind(new InetSocketAddress(port));
426 header.setChannel(channel);
427 processor = new ConnectionProcessorManager(header, handler, readProcessCount, writeProcessCount,
428 this._useThreadedProcessors);
429 acceptor = new ConnectionAcceptor(channel, processor);
430 }
431 catch (IOException e)
432 {
433 logger.log(Level.SEVERE, "!!! IOException occured !!! ", e);
434 IOUtils.closeQuite(channel);
435 throw (e);
436 }
437
438
439 processor.start();
440
441 acceptor.start();
442
443 if (!classloader)
444 {
445 this._serverRunning = true;
446 this._acceptor = acceptor;
447 this._processor = processor;
448 this._channel = channel;
449 if (logger.isLoggable(Level.INFO))
450 {
451 logger.log(Level.INFO, "EJOE server listening on: " + channel.socket().getLocalSocketAddress());
452 logger.log(Level.INFO, "Using non-blocking IO: " + this._enableNewIO);
453 logger.log(Level.INFO, "Allowing persistent client connections: "
454 + this._enablePersistentConnections);
455 logger.log(Level.INFO, "Using compression: " + this._useCompression);
456 }
457 }
458 else
459 {
460 this._classServerRunning = true;
461 this._classAcceptor = acceptor;
462 this._classProcessor = processor;
463 this._classLoaderChannel = channel;
464 logger.log(Level.INFO, "EJOE classloader server listening on: "
465 + channel.socket().getLocalSocketAddress());
466 }
467 }
468 }
469
470 private void setOptions(int options)
471 {
472 enableNonBlockingIO((options & CON_USE_NONBLOCKING_IO) == CON_USE_NONBLOCKING_IO);
473 enableCompression((options & CON_USE_COMPRESSION) == CON_USE_COMPRESSION);
474 enablePersistentConnections((options & CON_USE_PERSISTENT) == CON_USE_PERSISTENT);
475 }
476 }