1 /**********************************************************************
2 * ConnectionProcessorManager.java
3 * created on 01.03.2005 by netseeker
4 * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/ConnectionProcessorManager.java,v $
5 * $Date: 2006/02/04 16:16:48 $
6 * $Revision: 1.29 $
7 *********************************************************************/
8 package de.netseeker.ejoe;
9
10 import java.io.IOException;
11 import java.nio.channels.CancelledKeyException;
12 import java.nio.channels.SelectionKey;
13 import java.nio.channels.Selector;
14 import java.nio.channels.SocketChannel;
15 import java.nio.channels.spi.SelectorProvider;
16 import java.util.Iterator;
17 import java.util.LinkedHashMap;
18 import java.util.Map;
19 import java.util.logging.Level;
20 import java.util.logging.Logger;
21
22 import de.netseeker.ejoe.concurrent.ThreadPool;
23 import de.netseeker.ejoe.handler.ServerHandler;
24 import de.netseeker.ejoe.io.IOUtils;
25
26 /***
27 * The ConnectionProcessorManager handles the further processing of accepted client connections. It simply does only
28 * separate the readable connections from the incoming flood of accepted connections and schedules a new
29 * ConnectionProcessor which will handle read/write operations and adapter invoking for each of the selected
30 * connections. This is compareable to the *old BIO style* where one thread is used for each connection.
31 *
32 * @author netseeker
33 */
34 final class ConnectionProcessorManager extends Thread implements ChannelRegistrar
35 {
36 private static final Logger logger = Logger.getLogger(ConnectionAcceptor.class.getName());
37
38 private ConnectionHeader _serverInfo;
39
40 private final ServerHandler _handler;
41
42 private final int _maxReadProcessors, _maxWriteProcessors;
43
44 private boolean _useThreadedProcessors = false;
45
46 private ThreadGroup _readGroup, _writeGroup;
47
48 private ThreadPool _readpool, _writePool;
49
50 private Selector _selector;
51
52 private Map _aspirants;
53
54 /***
55 * Creates a new ConnectionProcessorManager instance. Using this constructor will force the newly created instance
56 * to use a non-blocking threadpool for invocation of read processor threads.
57 *
58 * @param serverInfo
59 * prefilled ConnectionHeader container server settings, eg. compression setting
60 * @param handler
61 * server handler to use when processing retrieved data
62 * @param adapter
63 * (de)serialize adapter to use when deserializing data during read or rather serializing data during
64 * write operations
65 * @param maxReadProcessors
66 * amount of read processors threads to use for reading data and processing these retrieved data
67 * @param maxWriteProcessors
68 * amount of writer threads to use for sending server responses to clients
69 * @throws IOException
70 */
71 public ConnectionProcessorManager(ConnectionHeader serverInfo, final ServerHandler handler, int maxReadProcessors,
72 int maxWriteProcessors) throws IOException
73 {
74 this._serverInfo = serverInfo;
75 this._handler = handler;
76 this._maxReadProcessors = maxReadProcessors;
77 this._maxWriteProcessors = maxWriteProcessors;
78 this._readGroup = new ThreadGroup("EJOE_RP_THREADS");
79 this._writeGroup = new ThreadGroup("EJOE_W_THREADS");
80 this._selector = SelectorProvider.provider().openSelector();
81 this._aspirants = new LinkedHashMap(Math.round((float) ((maxReadProcessors + maxWriteProcessors) * 1.5)));
82 }
83
84 /***
85 * Creates a new ConnectionProcessorManager instance
86 *
87 * @param serverInfo
88 * prefilled ConnectionHeader container server settings, eg. compression setting
89 * @param handler
90 * server handler to use when processing retrieved data
91 * @param adapter
92 * (de)serialize adapter to use when deserializing data during read or rather serializing data during
93 * write operations
94 * @param maxReadProcessors
95 * amount of read processors threads to use for reading data and processing these retrieved data
96 * @param maxWriteProcessors
97 * amount of writer threads to use for sending server responses to clients
98 * @param useThreadedReadProcessors
99 * whether to use a blocking thread start mechanism, which will invoke new read processor threads not
100 * until the amount of concurrently running read processors is lower than <var>maxReadProcessors </var>
101 * @throws IOException
102 */
103 public ConnectionProcessorManager(ConnectionHeader serverInfo, final ServerHandler handler, int maxProcessors,
104 int maxWriteProcessors, boolean useThreadedReadProcessors) throws IOException
105 {
106 this(serverInfo, handler, maxProcessors, maxWriteProcessors);
107 this._useThreadedProcessors = useThreadedReadProcessors;
108 }
109
110
111
112
113
114
115 public void run()
116 {
117 if (!this._useThreadedProcessors)
118 {
119 this._readpool = new ThreadPool(this._readGroup, this._maxReadProcessors);
120 }
121
122 this._writePool = new ThreadPool(this._writeGroup, this._maxWriteProcessors);
123
124 try
125 {
126 registerAspirants();
127
128 while (!isInterrupted())
129 {
130 this._selector.select();
131 registerAspirants();
132 Iterator it = this._selector.selectedKeys().iterator();
133 SocketChannel cChannel;
134
135 while (it.hasNext() && !isInterrupted())
136 {
137 SelectionKey selKey = (SelectionKey) it.next();
138 it.remove();
139
140 try
141 {
142
143 if (!selKey.isValid())
144 {
145 continue;
146 }
147
148 ConnectionHeader clientInfo = (ConnectionHeader) selKey.attachment();
149
150 if (selKey.isReadable())
151 {
152 cChannel = (SocketChannel) selKey.channel();
153 selKey.cancel();
154
155 if (cChannel != null)
156 {
157 if (!this._serverInfo.hasNonBlockingReadWrite())
158 {
159 logger.log(Level.FINEST,
160 "Setting socket to blocking mode for further io operations...");
161 cChannel.configureBlocking(true);
162 }
163
164 invokeReadOperation(new ConnectionReader(this, this._serverInfo, clientInfo,
165 this._handler));
166 }
167 }
168 else if (selKey.isWritable())
169 {
170 cChannel = (SocketChannel) selKey.channel();
171 selKey.cancel();
172
173 if (cChannel != null)
174 {
175 if (!this._serverInfo.hasNonBlockingReadWrite())
176 {
177 cChannel.configureBlocking(true);
178 }
179
180 this._writePool.invokeLater(new ConnectionWriter(this, this._serverInfo, clientInfo));
181 }
182 }
183 }
184 catch (CancelledKeyException cke)
185 {
186 logger.log(Level.WARNING, "Key cancelled!", cke);
187 }
188 }
189 }
190 }
191 catch (IOException e)
192 {
193 logger.log(Level.SEVERE, "!!! IOException occured !!! ", e);
194 throw new RuntimeException(e);
195 }
196 finally
197 {
198 try
199 {
200 if (this._readpool != null)
201 {
202 this._readpool.stop();
203 }
204 }
205 catch (Exception e)
206 {
207 logger.log(Level.SEVERE, "!!! Error while stopping server !!!", e);
208 }
209 try
210 {
211 this._writePool.stop();
212 }
213 catch (Exception e)
214 {
215 logger.log(Level.SEVERE, "!!! Error while stopping server !!!", e);
216 }
217
218 IOUtils.closeQuite(this._selector);
219 }
220 }
221
222 /***
223 * @param runnable
224 */
225 private void invokeReadOperation(Runnable runnable)
226 {
227 if (!isInterrupted())
228 {
229 if (!this._useThreadedProcessors)
230 {
231 this._readpool.invokeLater(runnable);
232 }
233 else
234 {
235 try
236 {
237 synchronized (this._readGroup)
238 {
239 while (this._readGroup.activeCount() >= this._maxReadProcessors)
240 {
241 this._readGroup.wait(EJConstants.EJOE_WAIT_TIMEOUT);
242 }
243 }
244 }
245 catch (InterruptedException e)
246 {
247 return;
248 }
249
250 new Thread(this._readGroup, runnable).start();
251 }
252 }
253 }
254
255 private void registerAspirants() throws IOException
256 {
257 int count = 0;
258
259 synchronized (this._aspirants)
260 {
261 if (!this._aspirants.isEmpty())
262 {
263
264
265 this._selector.selectNow();
266 Iterator ait = this._aspirants.keySet().iterator();
267 ConnectionHeader header = null;
268 SocketChannel channel = null;
269 while (ait.hasNext())
270 {
271 header = (ConnectionHeader) ait.next();
272 channel = (SocketChannel)header.getChannel();
273 if (channel.isBlocking())
274 {
275 logger.log(Level.FINEST,
276 "Setting socket temporarily to non-blocking until further connection processing...");
277 channel.configureBlocking(false);
278 }
279 channel.register(this._selector, ((Integer) this._aspirants.get(header)).intValue(), header);
280 ait.remove();
281 count++;
282 }
283 }
284
285 if (logger.isLoggable(Level.FINEST))
286 {
287 logger.log(Level.FINEST, System.currentTimeMillis() + " - registered " + count + " aspirants...");
288 }
289 }
290 }
291
292
293
294
295
296
297 public void register(ConnectionHeader clientInfo, int interest)
298 {
299 synchronized (this._aspirants)
300 {
301 this._aspirants.put(clientInfo, new Integer(interest));
302 this._selector.wakeup();
303 }
304 }
305
306
307
308
309
310
311 public boolean isValid()
312 {
313 return isAlive() && !isInterrupted();
314 }
315 }