View Javadoc

1   /**********************************************************************
2    * CombinedConnectionProcessor.java
3    * created on 01.03.2005 by netseeker
4    * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/core/CombinedConnectionProcessor.java,v $
5    * $Date: 2007/03/22 21:01:28 $
6    * $Revision: 1.3 $
7    *
8    * ====================================================================
9    *
10   *  Copyright 2006 netseeker aka Michael Manske
11   *
12   *  Licensed under the Apache License, Version 2.0 (the "License");
13   *  you may not use this file except in compliance with the License.
14   *  You may obtain a copy of the License at
15   *
16   *      http://www.apache.org/licenses/LICENSE-2.0
17   *
18   *  Unless required by applicable law or agreed to in writing, software
19   *  distributed under the License is distributed on an "AS IS" BASIS,
20   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21   *  See the License for the specific language governing permissions and
22   *  limitations under the License.
23   * ====================================================================
24   *
25   * This file is part of the ejoe framework.
26   * For more information on the author, please see
27   * <http://www.manskes.de/>.
28   *
29   *********************************************************************/
30  package de.netseeker.ejoe.core;
31  
32  import java.io.IOException;
33  import java.nio.channels.CancelledKeyException;
34  import java.nio.channels.SelectionKey;
35  import java.nio.channels.Selector;
36  import java.nio.channels.SocketChannel;
37  import java.util.ArrayList;
38  import java.util.Collections;
39  import java.util.Iterator;
40  import java.util.List;
41  import java.util.Set;
42  import java.util.Timer;
43  import java.util.logging.Level;
44  import java.util.logging.Logger;
45  
46  import de.netseeker.ejoe.ConnectionHeader;
47  import de.netseeker.ejoe.EJConstants;
48  import de.netseeker.ejoe.ServerInfo;
49  import de.netseeker.ejoe.concurrent.ThreadPoolFactory;
50  import de.netseeker.ejoe.concurrent.ThreadPoolResizer;
51  import de.netseeker.ejoe.concurrent.ThreadService;
52  import de.netseeker.ejoe.io.IOUtil;
53  
54  /***
55   * The CombinedConnectionProcessor handles the further processing of accepted client connections. It simply does only
56   * separate the readable connections from the incoming flood of accepted connections and schedules a new
57   * ConnectionProcessor which will handle read/write operations and adapter invoking for each of the selected
58   * connections. This is compareable to the *old BIO style* where one thread is used for each connection.
59   * 
60   * @author netseeker
61   * @since 0.3.5
62   */
63  public final class CombinedConnectionProcessor extends Thread implements ChannelRegistrar
64  {
65      private static final Logger  logger       = Logger.getLogger( ConnectionAcceptor.class.getName() );
66  
67      private static final Integer INT_OP_WRITE = new Integer( SelectionKey.OP_WRITE );
68  
69      private static final Integer INT_OP_READ  = new Integer( SelectionKey.OP_READ );
70  
71      private ServerInfo           _serverInfo;
72  
73      private ThreadGroup          _readGroup, _writeGroup;
74  
75      private ThreadService        _readpool, _writePool;
76  
77      private Selector             _selector;
78  
79      private List                 _aspirants;
80  
81      private int                  _load        = 0;
82  
83      /***
84       * Creates a new CombinedConnectionProcessor instance. Using this constructor will force the newly created instance
85       * to use a non-blocking threadpool for invocation of read processor threads.
86       * 
87       * @param serverInfo prefilled ConnectionHeader container server settings, eg. compression setting
88       * @throws IOException
89       */
90      public CombinedConnectionProcessor(ServerInfo serverInfo) throws IOException
91      {
92          super( "EJOE CombinedConnectionProcessor" );
93          this._serverInfo = serverInfo;
94          // use our own ThreadGroups to simplify identifying EJOE threads in the
95          // java (or systems) process list
96          this._readGroup = new ThreadGroup( "EJOE_RP_THREADS" );
97          this._writeGroup = new ThreadGroup( "EJOE_W_THREADS" );
98  
99          this._aspirants = Collections.synchronizedList( new ArrayList( 64 ) );
100 
101         // open a global Selector instance
102         this._selector = Selector.open();
103 
104         // clean out previously cancelled keys - system might reuse selectors
105         this._selector.selectNow();
106     }
107 
108     /*
109      * (non-Javadoc)
110      * 
111      * @see java.lang.Thread#run()
112      */
113     public void run()
114     {
115         Timer timer = null;
116 
117         this._readpool = ThreadPoolFactory.createFixedThreadPool( this._serverInfo.getMaxReadProcessors(),
118                                                                   this._readGroup );
119         // create a global ThreadPool for network write operations
120         this._writePool = ThreadPoolFactory.createFixedThreadPool( this._serverInfo.getMaxWriteProcessors(),
121                                                                    this._writeGroup );
122 
123         // shall we monitor the ThreadPools and resize them
124         // according to server load (this will also recreate died worker
125         // threads)
126         if ( this._serverInfo.isAutomaticThreadPoolResize() )
127         {
128             timer = new Timer();
129 
130             // schedule a monitor task for the read-process-thread-pool
131             timer.schedule( new ThreadPoolResizer( this._readpool, this._serverInfo.getMaxReadProcessors(),
132                                                    EJConstants.EJOE_POOL_RESIZER_SHRINKWAIT ), this._serverInfo
133                     .getPoolResizePeriod(), this._serverInfo.getPoolResizePeriod() );
134             // schedule a monitor task for the writer-thread-pool
135             timer.schedule( new ThreadPoolResizer( this._writePool, this._serverInfo.getMaxWriteProcessors(),
136                                                    EJConstants.EJOE_POOL_RESIZER_SHRINKWAIT ), this._serverInfo
137                     .getPoolResizePeriod(), this._serverInfo.getPoolResizePeriod() );
138         }
139 
140         try
141         {
142             Iterator it;
143             SocketChannel cChannel;
144             SelectionKey selKey;
145             ConnectionHeader clientInfo;
146             Set keys;
147 
148             while ( !isInterrupted() )
149             {
150                 this._load = 0;
151 
152                 // (pre)register interested socket channels
153                 registerAspirants();
154 
155                 // just try endless new selects until there are interested
156                 // socket channels
157                 if ( _selector.select() == 0 ) continue;
158 
159                 keys = this._selector.selectedKeys();
160                 this._load = keys.size();
161                 it = this._selector.selectedKeys().iterator();
162 
163                 // loop over all selected channels, just take care of thread
164                 // interruption
165                 while ( it.hasNext() && !isInterrupted() )
166                 {
167                     selKey = (SelectionKey) it.next();
168                     // remove the SelectionKey from the Iterator otherwise it
169                     // will be lost
170                     it.remove();
171 
172                     try
173                     {
174                         // validate the key
175                         if ( !selKey.isValid() ) continue;
176 
177                         // at least our ConnectionAcceptor has created a client
178                         // ConnectionHeader
179                         clientInfo = (ConnectionHeader) selKey.attachment();
180 
181                         // first check read-availbility
182                         if ( selKey.isReadable() )
183                         {
184                             // get the underlying socket channel
185                             cChannel = (SocketChannel) selKey.channel();
186                             // cancel the channels registration with our
187                             // Selector
188                             selKey.cancel();
189 
190                             // little bit paranoia
191                             if ( cChannel != null && cChannel.isOpen() )
192                             {
193                                 // don't we support NIO?
194                                 if ( !this._serverInfo.hasNonBlockingReadWrite() )
195                                 {
196                                     logger.log( Level.FINEST,
197                                                 "Setting socket to blocking mode for further io operations..." );
198 
199                                     // prepare the channel for upcoming blocking
200                                     // network operations
201                                     cChannel.configureBlocking( true );
202                                 }
203 
204                                 // schedule a asynchronious read-process operation
205                                 this._readpool.invokeLater( new ConnectionReader( this, this._serverInfo, clientInfo ) );
206                             }
207                         }
208                         else if ( selKey.isWritable() )
209                         {
210                             // get the underlying socket channel
211                             cChannel = (SocketChannel) selKey.channel();
212                             // cancel the channels registration with our
213                             // Selector
214                             selKey.cancel();
215 
216                             // little bit paranoia
217                             if ( cChannel != null && cChannel.isOpen() )
218                             {
219                                 // don't we support NIO?
220                                 if ( !this._serverInfo.hasNonBlockingReadWrite() )
221                                 {
222                                     if ( !clientInfo.hasAttachment() ) continue;
223                                     // prepare the channel for upcoming blocking
224                                     // network operations
225                                     cChannel.configureBlocking( true );
226                                 }
227 
228                                 // schedule a asynchronious socket-write
229                                 // operation
230                                 this._writePool
231                                         .invokeLater( new ConnectionWriter( this, this._serverInfo, clientInfo ) );
232                             }
233                         }
234                     }
235                     catch ( CancelledKeyException cke )
236                     {
237                         logger.log( Level.WARNING, "Key cancelled!", cke );
238                     }
239                     finally
240                     {
241                         this._load--;
242                     }
243                 }
244             }
245         }
246         catch ( IOException e )
247         {
248             logger.log( Level.SEVERE, "!!! IOException occured !!! ", e );
249             throw new RuntimeException( e );
250         }
251         finally
252         {
253             // kill our ThreadPool monitors if existent
254             if ( timer != null ) timer.cancel();
255 
256             try
257             {
258                 // shutdown the Read-Process-ThreadPool if existent
259                 if ( this._readpool != null )
260                 {
261                     this._readpool.stop();
262                 }
263             }
264             catch ( Exception e )
265             {
266                 logger.log( Level.SEVERE, "!!! Error while stopping server !!!", e );
267             }
268             try
269             {
270                 // shutdown the Write-ThreadPool
271                 this._writePool.stop();
272             }
273             catch ( Exception e )
274             {
275                 logger.log( Level.SEVERE, "!!! Error while stopping server !!!", e );
276             }
277 
278             // and finally close the global Selector
279             IOUtil.closeQuiet( this._selector );
280         }
281     }
282 
283     /***
284      * Registers all temporalily queued socket channels on the used {@link Selector}
285      * 
286      * @throws IOException
287      */
288     private void registerAspirants() throws IOException
289     {
290         int count = 0;
291         int size = this._aspirants.size();
292 
293         if ( size > 0 )
294         {
295             // clean out cancelled key list
296             this._selector.selectNow();
297             Object[] arr = null;
298             ConnectionHeader header = null;
299             SocketChannel channel = null;
300             SelectionKey sk = null;
301 
302             for ( ; count < size; count++ )
303             {
304                 arr = (Object[]) this._aspirants.remove( 0 );
305                 header = (ConnectionHeader) arr[0];
306                 channel = header.getChannel();
307                 // set a previously blocking socket channel to non-blocking
308                 // mode to allow selector operations on it
309                 if ( channel.isBlocking() )
310                 {
311                     logger.log( Level.FINEST,
312                                 "Setting socket temporarily to non-blocking until further connection processing..." );
313                     channel.configureBlocking( false );
314                 }
315 
316                 sk = channel.keyFor( this._selector );
317                 if ( sk == null )
318                 {
319                     // register with no interest
320                     sk = channel.register( this._selector, 0 );
321                 }
322 
323                 // attach the client connection header
324                 sk.attach( header );
325                 // now set the requested interest
326                 sk.interestOps( ((Integer) arr[1]).intValue() );
327             }
328         }
329 
330         if ( logger.isLoggable( Level.FINEST ) )
331         {
332             logger.log( Level.FINEST, System.currentTimeMillis() + " - registered " + count + " aspirants..." );
333         }
334     }
335 
336     /*
337      * (non-Javadoc)
338      * 
339      * @see de.netseeker.ejoe.io.ReadWriteChannelRegistrar#register(de.netseeker.ejoe.ConnectionHeader, int)
340      */
341     public void register( ConnectionHeader clientInfo, int interest )
342     {
343         // store the connection header with it's contained socket channel in
344         // the global queue
345         this._aspirants
346                 .add( new Object[] { clientInfo, (interest == SelectionKey.OP_READ) ? INT_OP_READ : INT_OP_WRITE } );
347 
348         // break blocking select operations in the selector to allow
349         // registration of the socket channels within the registration queue
350         // after processing all currently selected socket channels
351         this._selector.wakeup();
352     }
353 
354     /*
355      * (non-Javadoc)
356      * 
357      * @see de.netseeker.ejoe.ChannelRegistrar#isValid()
358      */
359     public boolean isValid()
360     {
361         return isAlive() && !isInterrupted();
362     }
363 
364     /*
365      * (non-Javadoc)
366      * 
367      * @see de.netseeker.ejoe.core.ChannelRegistrar#getLoad()
368      */
369     public int getLoad()
370     {
371         return this._load;
372     }
373 }