1 /**********************************************************************
2 * ConnectionAcceptor.java
3 * created on 01.03.2005 by netseeker
4 * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/core/ConnectionAcceptor.java,v $
5 * $Date: 2007/03/29 15:46:50 $
6 * $Revision: 1.4 $
7 *
8 * ====================================================================
9 *
10 * Copyright 2005-2006 netseeker aka Michael Manske
11 *
12 * Licensed under the Apache License, Version 2.0 (the "License");
13 * you may not use this file except in compliance with the License.
14 * You may obtain a copy of the License at
15 *
16 * http://www.apache.org/licenses/LICENSE-2.0
17 *
18 * Unless required by applicable law or agreed to in writing, software
19 * distributed under the License is distributed on an "AS IS" BASIS,
20 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21 * See the License for the specific language governing permissions and
22 * limitations under the License.
23 * ====================================================================
24 *
25 * This file is part of the ejoe framework.
26 * For more information on the author, please see
27 * <http://www.manskes.de/>.
28 *
29 *********************************************************************/
30 package de.netseeker.ejoe.core;
31
32 import java.io.IOException;
33 import java.nio.channels.CancelledKeyException;
34 import java.nio.channels.SelectionKey;
35 import java.nio.channels.Selector;
36 import java.nio.channels.ServerSocketChannel;
37 import java.nio.channels.SocketChannel;
38 import java.util.Iterator;
39 import java.util.logging.Level;
40 import java.util.logging.Logger;
41
42 import de.netseeker.ejoe.ConnectionHeader;
43 import de.netseeker.ejoe.EJConstants;
44 import de.netseeker.ejoe.io.IOUtil;
45
46 /***
47 * The ConnectionAcceptor class is a server thread handling accepting of incoming client connections and simply hands
48 * over these connections to another thread for further processing. The adavantage of the separation of acception
49 * management and processing management *can* result in a significantly higher throughput if dealing with a lot of
50 * clients.
51 *
52 * @author netseeker
53 * @since 0.3.0
54 */
55 public final class ConnectionAcceptor extends Thread
56 {
57 private static final Logger logger = Logger.getLogger( ConnectionAcceptor.class.getName() );
58
59 private ServerSocketChannel _channel;
60
61 private ChannelRegistrar[] _registrars;
62
63 private Selector _selector;
64
65 public ConnectionAcceptor(final ServerSocketChannel channel, final ChannelRegistrar registrars[])
66 throws IOException
67 {
68 super( "EJOE ConnectionAcceptor" );
69 this._channel = channel;
70 this._registrars = registrars;
71 this._selector = Selector.open();
72
73 this._selector.selectNow();
74 }
75
76
77
78
79
80
81 public void run()
82 {
83 try
84 {
85
86 this._channel.register( _selector, SelectionKey.OP_ACCEPT );
87
88 while ( !isInterrupted() )
89 {
90
91
92 if ( _selector.select() == 0 )
93 {
94 continue;
95 }
96
97 Iterator it = _selector.selectedKeys().iterator();
98 SelectionKey selKey = null;
99 SocketChannel cChannel = null;
100 ConnectionHeader header = null;
101
102 while ( it.hasNext() && !isInterrupted() )
103 {
104 selKey = (SelectionKey) it.next();
105 it.remove();
106
107 try
108 {
109
110 if ( !selKey.isValid() )
111 {
112 continue;
113 }
114
115
116 if ( selKey.isAcceptable() )
117 {
118
119 cChannel = this._channel.accept();
120
121
122 if ( cChannel != null )
123 {
124
125 cChannel.socket().setTrafficClass( EJConstants.IPTOS_THROUGHPUT );
126 cChannel.socket().setReuseAddress( true );
127 cChannel.socket().setSoLinger( false, 0 );
128
129
130
131 cChannel.configureBlocking( false );
132
133
134
135 header = new ConnectionHeader( cChannel, cChannel.socket().getRemoteSocketAddress()
136 .toString(), true );
137
138
139 getProcessor().register( header, SelectionKey.OP_READ );
140
141 if ( logger.isLoggable( Level.FINE ) )
142 {
143 logger.log( Level.FINE, "Connection accepted from "
144 + cChannel.socket().getRemoteSocketAddress() );
145 }
146 }
147 }
148 }
149 catch ( CancelledKeyException cke )
150 {
151 logger.log( Level.WARNING, "Key canceled!", cke );
152 }
153 }
154 }
155 }
156 catch ( IOException e )
157 {
158 logger.log( Level.SEVERE, "!!! IOException occured !!! ", e );
159 }
160 finally
161 {
162 try
163 {
164 IOUtil.closeQuiet( _selector );
165 }
166 catch ( Exception e )
167 {
168 logger.log( Level.SEVERE, "!!! Error while stopping server !!!", e );
169 }
170 }
171 }
172
173 /***
174 * @return
175 */
176 private ChannelRegistrar getProcessor()
177 {
178 ChannelRegistrar rProcessor = null;
179
180 if ( this._registrars.length == 1 )
181 {
182 rProcessor = this._registrars[0];
183 }
184 else
185 {
186 int load = -1, pLoad = 0;
187 ChannelRegistrar tProcessor;
188
189 for ( int i = 0; i < this._registrars.length; i++ )
190 {
191 tProcessor = this._registrars[i];
192 pLoad = tProcessor.getLoad();
193 if ( pLoad < load || load == -1 )
194 {
195 rProcessor = tProcessor;
196 }
197 }
198 }
199
200 return rProcessor;
201 }
202 }