1 /**********************************************************************
2 * CombinedConnectionProcessor.java
3 * created on 01.03.2005 by netseeker
4 * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/core/CombinedConnectionProcessor.java,v $
5 * $Date: 2007/03/22 21:01:28 $
6 * $Revision: 1.3 $
7 *
8 * ====================================================================
9 *
10 * Copyright 2006 netseeker aka Michael Manske
11 *
12 * Licensed under the Apache License, Version 2.0 (the "License");
13 * you may not use this file except in compliance with the License.
14 * You may obtain a copy of the License at
15 *
16 * http://www.apache.org/licenses/LICENSE-2.0
17 *
18 * Unless required by applicable law or agreed to in writing, software
19 * distributed under the License is distributed on an "AS IS" BASIS,
20 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21 * See the License for the specific language governing permissions and
22 * limitations under the License.
23 * ====================================================================
24 *
25 * This file is part of the ejoe framework.
26 * For more information on the author, please see
27 * <http://www.manskes.de/>.
28 *
29 *********************************************************************/
30 package de.netseeker.ejoe.core;
31
32 import java.io.IOException;
33 import java.nio.channels.CancelledKeyException;
34 import java.nio.channels.SelectionKey;
35 import java.nio.channels.Selector;
36 import java.nio.channels.SocketChannel;
37 import java.util.ArrayList;
38 import java.util.Collections;
39 import java.util.Iterator;
40 import java.util.List;
41 import java.util.Set;
42 import java.util.Timer;
43 import java.util.logging.Level;
44 import java.util.logging.Logger;
45
46 import de.netseeker.ejoe.ConnectionHeader;
47 import de.netseeker.ejoe.EJConstants;
48 import de.netseeker.ejoe.ServerInfo;
49 import de.netseeker.ejoe.concurrent.ThreadPoolFactory;
50 import de.netseeker.ejoe.concurrent.ThreadPoolResizer;
51 import de.netseeker.ejoe.concurrent.ThreadService;
52 import de.netseeker.ejoe.io.IOUtil;
53
54 /***
55 * The CombinedConnectionProcessor handles the further processing of accepted client connections. It simply does only
56 * separate the readable connections from the incoming flood of accepted connections and schedules a new
57 * ConnectionProcessor which will handle read/write operations and adapter invoking for each of the selected
58 * connections. This is compareable to the *old BIO style* where one thread is used for each connection.
59 *
60 * @author netseeker
61 * @since 0.3.5
62 */
63 public final class CombinedConnectionProcessor extends Thread implements ChannelRegistrar
64 {
65 private static final Logger logger = Logger.getLogger( ConnectionAcceptor.class.getName() );
66
67 private static final Integer INT_OP_WRITE = new Integer( SelectionKey.OP_WRITE );
68
69 private static final Integer INT_OP_READ = new Integer( SelectionKey.OP_READ );
70
71 private ServerInfo _serverInfo;
72
73 private ThreadGroup _readGroup, _writeGroup;
74
75 private ThreadService _readpool, _writePool;
76
77 private Selector _selector;
78
79 private List _aspirants;
80
81 private int _load = 0;
82
83 /***
84 * Creates a new CombinedConnectionProcessor instance. Using this constructor will force the newly created instance
85 * to use a non-blocking threadpool for invocation of read processor threads.
86 *
87 * @param serverInfo prefilled ConnectionHeader container server settings, eg. compression setting
88 * @throws IOException
89 */
90 public CombinedConnectionProcessor(ServerInfo serverInfo) throws IOException
91 {
92 super( "EJOE CombinedConnectionProcessor" );
93 this._serverInfo = serverInfo;
94
95
96 this._readGroup = new ThreadGroup( "EJOE_RP_THREADS" );
97 this._writeGroup = new ThreadGroup( "EJOE_W_THREADS" );
98
99 this._aspirants = Collections.synchronizedList( new ArrayList( 64 ) );
100
101
102 this._selector = Selector.open();
103
104
105 this._selector.selectNow();
106 }
107
108
109
110
111
112
113 public void run()
114 {
115 Timer timer = null;
116
117 this._readpool = ThreadPoolFactory.createFixedThreadPool( this._serverInfo.getMaxReadProcessors(),
118 this._readGroup );
119
120 this._writePool = ThreadPoolFactory.createFixedThreadPool( this._serverInfo.getMaxWriteProcessors(),
121 this._writeGroup );
122
123
124
125
126 if ( this._serverInfo.isAutomaticThreadPoolResize() )
127 {
128 timer = new Timer();
129
130
131 timer.schedule( new ThreadPoolResizer( this._readpool, this._serverInfo.getMaxReadProcessors(),
132 EJConstants.EJOE_POOL_RESIZER_SHRINKWAIT ), this._serverInfo
133 .getPoolResizePeriod(), this._serverInfo.getPoolResizePeriod() );
134
135 timer.schedule( new ThreadPoolResizer( this._writePool, this._serverInfo.getMaxWriteProcessors(),
136 EJConstants.EJOE_POOL_RESIZER_SHRINKWAIT ), this._serverInfo
137 .getPoolResizePeriod(), this._serverInfo.getPoolResizePeriod() );
138 }
139
140 try
141 {
142 Iterator it;
143 SocketChannel cChannel;
144 SelectionKey selKey;
145 ConnectionHeader clientInfo;
146 Set keys;
147
148 while ( !isInterrupted() )
149 {
150 this._load = 0;
151
152
153 registerAspirants();
154
155
156
157 if ( _selector.select() == 0 ) continue;
158
159 keys = this._selector.selectedKeys();
160 this._load = keys.size();
161 it = this._selector.selectedKeys().iterator();
162
163
164
165 while ( it.hasNext() && !isInterrupted() )
166 {
167 selKey = (SelectionKey) it.next();
168
169
170 it.remove();
171
172 try
173 {
174
175 if ( !selKey.isValid() ) continue;
176
177
178
179 clientInfo = (ConnectionHeader) selKey.attachment();
180
181
182 if ( selKey.isReadable() )
183 {
184
185 cChannel = (SocketChannel) selKey.channel();
186
187
188 selKey.cancel();
189
190
191 if ( cChannel != null && cChannel.isOpen() )
192 {
193
194 if ( !this._serverInfo.hasNonBlockingReadWrite() )
195 {
196 logger.log( Level.FINEST,
197 "Setting socket to blocking mode for further io operations..." );
198
199
200
201 cChannel.configureBlocking( true );
202 }
203
204
205 this._readpool.invokeLater( new ConnectionReader( this, this._serverInfo, clientInfo ) );
206 }
207 }
208 else if ( selKey.isWritable() )
209 {
210
211 cChannel = (SocketChannel) selKey.channel();
212
213
214 selKey.cancel();
215
216
217 if ( cChannel != null && cChannel.isOpen() )
218 {
219
220 if ( !this._serverInfo.hasNonBlockingReadWrite() )
221 {
222 if ( !clientInfo.hasAttachment() ) continue;
223
224
225 cChannel.configureBlocking( true );
226 }
227
228
229
230 this._writePool
231 .invokeLater( new ConnectionWriter( this, this._serverInfo, clientInfo ) );
232 }
233 }
234 }
235 catch ( CancelledKeyException cke )
236 {
237 logger.log( Level.WARNING, "Key cancelled!", cke );
238 }
239 finally
240 {
241 this._load--;
242 }
243 }
244 }
245 }
246 catch ( IOException e )
247 {
248 logger.log( Level.SEVERE, "!!! IOException occured !!! ", e );
249 throw new RuntimeException( e );
250 }
251 finally
252 {
253
254 if ( timer != null ) timer.cancel();
255
256 try
257 {
258
259 if ( this._readpool != null )
260 {
261 this._readpool.stop();
262 }
263 }
264 catch ( Exception e )
265 {
266 logger.log( Level.SEVERE, "!!! Error while stopping server !!!", e );
267 }
268 try
269 {
270
271 this._writePool.stop();
272 }
273 catch ( Exception e )
274 {
275 logger.log( Level.SEVERE, "!!! Error while stopping server !!!", e );
276 }
277
278
279 IOUtil.closeQuiet( this._selector );
280 }
281 }
282
283 /***
284 * Registers all temporalily queued socket channels on the used {@link Selector}
285 *
286 * @throws IOException
287 */
288 private void registerAspirants() throws IOException
289 {
290 int count = 0;
291 int size = this._aspirants.size();
292
293 if ( size > 0 )
294 {
295
296 this._selector.selectNow();
297 Object[] arr = null;
298 ConnectionHeader header = null;
299 SocketChannel channel = null;
300 SelectionKey sk = null;
301
302 for ( ; count < size; count++ )
303 {
304 arr = (Object[]) this._aspirants.remove( 0 );
305 header = (ConnectionHeader) arr[0];
306 channel = header.getChannel();
307
308
309 if ( channel.isBlocking() )
310 {
311 logger.log( Level.FINEST,
312 "Setting socket temporarily to non-blocking until further connection processing..." );
313 channel.configureBlocking( false );
314 }
315
316 sk = channel.keyFor( this._selector );
317 if ( sk == null )
318 {
319
320 sk = channel.register( this._selector, 0 );
321 }
322
323
324 sk.attach( header );
325
326 sk.interestOps( ((Integer) arr[1]).intValue() );
327 }
328 }
329
330 if ( logger.isLoggable( Level.FINEST ) )
331 {
332 logger.log( Level.FINEST, System.currentTimeMillis() + " - registered " + count + " aspirants..." );
333 }
334 }
335
336
337
338
339
340
341 public void register( ConnectionHeader clientInfo, int interest )
342 {
343
344
345 this._aspirants
346 .add( new Object[] { clientInfo, (interest == SelectionKey.OP_READ) ? INT_OP_READ : INT_OP_WRITE } );
347
348
349
350
351 this._selector.wakeup();
352 }
353
354
355
356
357
358
359 public boolean isValid()
360 {
361 return isAlive() && !isInterrupted();
362 }
363
364
365
366
367
368
369 public int getLoad()
370 {
371 return this._load;
372 }
373 }