View Javadoc

1   /**********************************************************************
2    * ConnectionProcessorManager.java
3    * created on 01.03.2005 by netseeker
4    * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/ConnectionProcessorManager.java,v $
5    * $Date: 2006/02/04 16:16:48 $
6    * $Revision: 1.29 $
7    *********************************************************************/
8   package de.netseeker.ejoe;
9   
10  import java.io.IOException;
11  import java.nio.channels.CancelledKeyException;
12  import java.nio.channels.SelectionKey;
13  import java.nio.channels.Selector;
14  import java.nio.channels.SocketChannel;
15  import java.nio.channels.spi.SelectorProvider;
16  import java.util.Iterator;
17  import java.util.LinkedHashMap;
18  import java.util.Map;
19  import java.util.logging.Level;
20  import java.util.logging.Logger;
21  
22  import de.netseeker.ejoe.concurrent.ThreadPool;
23  import de.netseeker.ejoe.handler.ServerHandler;
24  import de.netseeker.ejoe.io.IOUtils;
25  
26  /***
27   * The ConnectionProcessorManager handles the further processing of accepted client connections. It simply does only
28   * separate the readable connections from the incoming flood of accepted connections and schedules a new
29   * ConnectionProcessor which will handle read/write operations and adapter invoking for each of the selected
30   * connections. This is compareable to the *old BIO style* where one thread is used for each connection.
31   *
32   * @author netseeker
33   */
34  final class ConnectionProcessorManager extends Thread implements ChannelRegistrar
35  {
36  	private static final Logger	logger					= Logger.getLogger(ConnectionAcceptor.class.getName());
37  
38  	private ConnectionHeader	_serverInfo;
39  
40  	private final ServerHandler	_handler;
41  
42  	private final int			_maxReadProcessors, _maxWriteProcessors;
43  
44  	private boolean				_useThreadedProcessors	= false;
45  
46  	private ThreadGroup			_readGroup, _writeGroup;
47  
48  	private ThreadPool			_readpool, _writePool;
49  
50  	private Selector			_selector;
51  
52  	private Map					_aspirants;
53  
54  	/***
55  	 * Creates a new ConnectionProcessorManager instance. Using this constructor will force the newly created instance
56  	 * to use a non-blocking threadpool for invocation of read processor threads.
57  	 *
58  	 * @param serverInfo
59  	 *            prefilled ConnectionHeader container server settings, eg. compression setting
60  	 * @param handler
61  	 *            server handler to use when processing retrieved data
62  	 * @param adapter
63  	 *            (de)serialize adapter to use when deserializing data during read or rather serializing data during
64  	 *            write operations
65  	 * @param maxReadProcessors
66  	 *            amount of read processors threads to use for reading data and processing these retrieved data
67  	 * @param maxWriteProcessors
68  	 *            amount of writer threads to use for sending server responses to clients
69  	 * @throws IOException
70  	 */
71  	public ConnectionProcessorManager(ConnectionHeader serverInfo, final ServerHandler handler, int maxReadProcessors,
72  			int maxWriteProcessors) throws IOException
73  	{
74  		this._serverInfo = serverInfo;
75  		this._handler = handler;
76  		this._maxReadProcessors = maxReadProcessors;
77  		this._maxWriteProcessors = maxWriteProcessors;
78  		this._readGroup = new ThreadGroup("EJOE_RP_THREADS");
79  		this._writeGroup = new ThreadGroup("EJOE_W_THREADS");
80  		this._selector = SelectorProvider.provider().openSelector();
81  		this._aspirants = new LinkedHashMap(Math.round((float) ((maxReadProcessors + maxWriteProcessors) * 1.5)));
82  	}
83  
84  	/***
85  	 * Creates a new ConnectionProcessorManager instance
86  	 *
87  	 * @param serverInfo
88  	 *            prefilled ConnectionHeader container server settings, eg. compression setting
89  	 * @param handler
90  	 *            server handler to use when processing retrieved data
91  	 * @param adapter
92  	 *            (de)serialize adapter to use when deserializing data during read or rather serializing data during
93  	 *            write operations
94  	 * @param maxReadProcessors
95  	 *            amount of read processors threads to use for reading data and processing these retrieved data
96  	 * @param maxWriteProcessors
97  	 *            amount of writer threads to use for sending server responses to clients
98  	 * @param useThreadedReadProcessors
99  	 *            whether to use a blocking thread start mechanism, which will invoke new read processor threads not
100 	 *            until the amount of concurrently running read processors is lower than <var>maxReadProcessors </var>
101 	 * @throws IOException
102 	 */
103 	public ConnectionProcessorManager(ConnectionHeader serverInfo, final ServerHandler handler, int maxProcessors,
104 			int maxWriteProcessors, boolean useThreadedReadProcessors) throws IOException
105 	{
106 		this(serverInfo, handler, maxProcessors, maxWriteProcessors);
107 		this._useThreadedProcessors = useThreadedReadProcessors;
108 	}
109 
110 	/*
111 	 * (non-Javadoc)
112 	 *
113 	 * @see java.lang.Thread#run()
114 	 */
115 	public void run()
116 	{
117 		if (!this._useThreadedProcessors)
118 		{
119 			this._readpool = new ThreadPool(this._readGroup, this._maxReadProcessors);
120 		}
121 
122 		this._writePool = new ThreadPool(this._writeGroup, this._maxWriteProcessors);
123 
124 		try
125 		{
126 			registerAspirants();
127 
128 			while (!isInterrupted())
129 			{
130 				this._selector.select();
131 				registerAspirants();
132 				Iterator it = this._selector.selectedKeys().iterator();
133 				SocketChannel cChannel;
134 
135 				while (it.hasNext() && !isInterrupted())
136 				{
137 					SelectionKey selKey = (SelectionKey) it.next();
138 					it.remove();
139 
140 					try
141 					{
142 						// validate the key
143 						if (!selKey.isValid())
144 						{
145 							continue;
146 						}
147 
148 						ConnectionHeader clientInfo = (ConnectionHeader) selKey.attachment();
149 
150 						if (selKey.isReadable())
151 						{
152 							cChannel = (SocketChannel) selKey.channel();
153 							selKey.cancel();
154 
155 							if (cChannel != null)
156 							{
157 								if (!this._serverInfo.hasNonBlockingReadWrite())
158 								{
159 									logger.log(Level.FINEST,
160 											"Setting socket to blocking mode for further io operations...");
161 									cChannel.configureBlocking(true);
162 								}
163 
164 								invokeReadOperation(new ConnectionReader(this, this._serverInfo, clientInfo,
165 										this._handler));
166 							}
167 						}
168 						else if (selKey.isWritable())
169 						{
170 							cChannel = (SocketChannel) selKey.channel();
171 							selKey.cancel();
172 
173 							if (cChannel != null)
174 							{
175 								if (!this._serverInfo.hasNonBlockingReadWrite())
176 								{
177 									cChannel.configureBlocking(true);
178 								}
179 
180 								this._writePool.invokeLater(new ConnectionWriter(this, this._serverInfo, clientInfo));
181 							}
182 						}
183 					}
184 					catch (CancelledKeyException cke)
185 					{
186 						logger.log(Level.WARNING, "Key cancelled!", cke);
187 					}
188 				}
189 			}
190 		}
191 		catch (IOException e)
192 		{
193 			logger.log(Level.SEVERE, "!!! IOException occured !!! ", e);
194 			throw new RuntimeException(e);
195 		}
196 		finally
197 		{
198 			try
199 			{
200 				if (this._readpool != null)
201 				{
202 					this._readpool.stop();
203 				}
204 			}
205 			catch (Exception e)
206 			{
207 				logger.log(Level.SEVERE, "!!! Error while stopping server !!!", e);
208 			}
209 			try
210 			{
211 				this._writePool.stop();
212 			}
213 			catch (Exception e)
214 			{
215 				logger.log(Level.SEVERE, "!!! Error while stopping server !!!", e);
216 			}
217 
218 			IOUtils.closeQuite(this._selector);
219 		}
220 	}
221 
222 	/***
223 	 * @param runnable
224 	 */
225 	private void invokeReadOperation(Runnable runnable)
226 	{
227 		if (!isInterrupted())
228 		{
229 			if (!this._useThreadedProcessors)
230 			{
231 				this._readpool.invokeLater(runnable);
232 			}
233 			else
234 			{
235 				try
236 				{
237 					synchronized (this._readGroup)
238 					{
239 						while (this._readGroup.activeCount() >= this._maxReadProcessors)
240 						{
241 							this._readGroup.wait(EJConstants.EJOE_WAIT_TIMEOUT);
242 						}
243 					}
244 				}
245 				catch (InterruptedException e)
246 				{
247 					return;
248 				}
249 
250 				new Thread(this._readGroup, runnable).start();
251 			}
252 		}
253 	}
254 
255 	private void registerAspirants() throws IOException
256 	{
257 		int count = 0;
258 
259 		synchronized (this._aspirants)
260 		{
261 			if (!this._aspirants.isEmpty())
262 			{
263 				// clean out cancelled key list before (re-)registering
264 				// new channels
265 				this._selector.selectNow();
266 				Iterator ait = this._aspirants.keySet().iterator();
267 				ConnectionHeader header = null;
268 				SocketChannel channel = null;
269 				while (ait.hasNext())
270 				{
271 					header = (ConnectionHeader) ait.next();
272 					channel = (SocketChannel)header.getChannel();
273 					if (channel.isBlocking())
274 					{
275 						logger.log(Level.FINEST,
276 								"Setting socket temporarily to non-blocking until further connection processing...");
277 						channel.configureBlocking(false);
278 					}
279 					channel.register(this._selector, ((Integer) this._aspirants.get(header)).intValue(), header);
280 					ait.remove();
281 					count++;
282 				}
283 			}
284 
285 			if (logger.isLoggable(Level.FINEST))
286 			{
287 				logger.log(Level.FINEST, System.currentTimeMillis() + " - registered " + count + " aspirants...");
288 			}
289 		}
290 	}
291 
292 	/*
293 	 * (non-Javadoc)
294 	 *
295 	 * @see de.netseeker.ejoe.io.ReadWriteChannelRegistrar#register(de.netseeker.ejoe.ConnectionHeader, int)
296 	 */
297 	public void register(ConnectionHeader clientInfo, int interest)
298 	{
299 		synchronized (this._aspirants)
300 		{
301 			this._aspirants.put(clientInfo, new Integer(interest));
302 			this._selector.wakeup();
303 		}
304 	}
305 
306 	/*
307 	 * (non-Javadoc)
308 	 *
309 	 * @see de.netseeker.ejoe.ChannelRegistrar#isValid()
310 	 */
311 	public boolean isValid()
312 	{
313 		return isAlive() && !isInterrupted();
314 	}
315 }