View Javadoc

1   /**********************************************************************
2    * ConnectionAcceptor.java
3    * created on 01.03.2005 by netseeker
4    * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/core/ConnectionAcceptor.java,v $
5    * $Date: 2007/03/29 15:46:50 $
6    * $Revision: 1.4 $
7    *
8    * ====================================================================
9    *
10   *  Copyright 2005-2006 netseeker aka Michael Manske
11   *
12   *  Licensed under the Apache License, Version 2.0 (the "License");
13   *  you may not use this file except in compliance with the License.
14   *  You may obtain a copy of the License at
15   *
16   *      http://www.apache.org/licenses/LICENSE-2.0
17   *
18   *  Unless required by applicable law or agreed to in writing, software
19   *  distributed under the License is distributed on an "AS IS" BASIS,
20   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21   *  See the License for the specific language governing permissions and
22   *  limitations under the License.
23   * ====================================================================
24   *
25   * This file is part of the ejoe framework.
26   * For more information on the author, please see
27   * <http://www.manskes.de/>.
28   *
29   *********************************************************************/
30  package de.netseeker.ejoe.core;
31  
32  import java.io.IOException;
33  import java.nio.channels.CancelledKeyException;
34  import java.nio.channels.SelectionKey;
35  import java.nio.channels.Selector;
36  import java.nio.channels.ServerSocketChannel;
37  import java.nio.channels.SocketChannel;
38  import java.util.Iterator;
39  import java.util.logging.Level;
40  import java.util.logging.Logger;
41  
42  import de.netseeker.ejoe.ConnectionHeader;
43  import de.netseeker.ejoe.EJConstants;
44  import de.netseeker.ejoe.io.IOUtil;
45  
46  /***
47   * The ConnectionAcceptor class is a server thread handling accepting of incoming client connections and simply hands
48   * over these connections to another thread for further processing. The adavantage of the separation of acception
49   * management and processing management *can* result in a significantly higher throughput if dealing with a lot of
50   * clients.
51   * 
52   * @author netseeker
53   * @since 0.3.0
54   */
55  public final class ConnectionAcceptor extends Thread
56  {
57      private static final Logger logger = Logger.getLogger( ConnectionAcceptor.class.getName() );
58  
59      private ServerSocketChannel _channel;
60  
61      private ChannelRegistrar[]  _registrars;
62  
63      private Selector            _selector;
64  
65      public ConnectionAcceptor(final ServerSocketChannel channel, final ChannelRegistrar registrars[])
66              throws IOException
67      {
68          super( "EJOE ConnectionAcceptor" );
69          this._channel = channel;
70          this._registrars = registrars;
71          this._selector = Selector.open();
72          // clean out previously cancelled keys - system might reuse selectors
73          this._selector.selectNow();
74      }
75  
76      /*
77       * (non-Javadoc)
78       * 
79       * @see java.lang.Thread#run()
80       */
81      public void run()
82      {
83          try
84          {
85              // register our server socket on the selector
86              this._channel.register( _selector, SelectionKey.OP_ACCEPT );
87  
88              while ( !isInterrupted() )
89              {
90                  // just do endless select operations until one or more intersted
91                  // socket channels will be found
92                  if ( _selector.select() == 0 )
93                  {
94                      continue;
95                  }
96  
97                  Iterator it = _selector.selectedKeys().iterator();
98                  SelectionKey selKey = null;
99                  SocketChannel cChannel = null;
100                 ConnectionHeader header = null;
101 
102                 while ( it.hasNext() && !isInterrupted() )
103                 {
104                     selKey = (SelectionKey) it.next();
105                     it.remove();
106 
107                     try
108                     {
109                         // validate the key
110                         if ( !selKey.isValid() )
111                         {
112                             continue;
113                         }
114 
115                         // socket is ready for establishing a connection?
116                         if ( selKey.isAcceptable() )
117                         {
118                             // establish the connection
119                             cChannel = this._channel.accept();
120 
121                             // a little bit paranoia
122                             if ( cChannel != null )
123                             {
124                                 // little bit tuning
125                                 cChannel.socket().setTrafficClass( EJConstants.IPTOS_THROUGHPUT );
126                                 cChannel.socket().setReuseAddress( true );
127                                 cChannel.socket().setSoLinger( false, 0 );
128                                 
129                                 // ensure that the socket channel is prepared
130                                 // for non-blocking operations
131                                 cChannel.configureBlocking( false );
132                                 // create a new ConnectionHeader for all
133                                 // upcoming operations on the
134                                 // accepted socket connection
135                                 header = new ConnectionHeader( cChannel, cChannel.socket().getRemoteSocketAddress()
136                                         .toString(), true );
137                                 // register the channel on another thread which
138                                 // will do further connection handling
139                                 getProcessor().register( header, SelectionKey.OP_READ );
140 
141                                 if ( logger.isLoggable( Level.FINE ) )
142                                 {
143                                     logger.log( Level.FINE, "Connection accepted from "
144                                             + cChannel.socket().getRemoteSocketAddress() );
145                                 }
146                             }
147                         }
148                     }
149                     catch ( CancelledKeyException cke )
150                     {
151                         logger.log( Level.WARNING, "Key canceled!", cke );
152                     }
153                 }
154             }
155         }
156         catch ( IOException e )
157         {
158             logger.log( Level.SEVERE, "!!! IOException occured !!! ", e );
159         }
160         finally
161         {
162             try
163             {
164                 IOUtil.closeQuiet( _selector );
165             }
166             catch ( Exception e )
167             {
168                 logger.log( Level.SEVERE, "!!! Error while stopping server !!!", e );
169             }
170         }
171     }
172 
173     /***
174      * @return
175      */
176     private ChannelRegistrar getProcessor()
177     {
178         ChannelRegistrar rProcessor = null;
179         
180         if ( this._registrars.length == 1 )
181         {
182             rProcessor = this._registrars[0];
183         }
184         else
185         {
186             int load = -1, pLoad = 0;
187             ChannelRegistrar tProcessor;
188 
189             for ( int i = 0; i < this._registrars.length; i++ )
190             {
191                 tProcessor = this._registrars[i];
192                 pLoad = tProcessor.getLoad();
193                 if ( pLoad < load || load == -1 )
194                 {
195                     rProcessor = tProcessor;
196                 }
197             }
198         }
199 
200         return rProcessor;
201     }
202 }