View Javadoc

1   /**********************************************************************
2    * ConnectionReader.java
3    * created on 05.03.2005 by netseeker
4    * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/ConnectionReader.java,v $
5    * $Date: 2006/03/22 00:06:58 $
6    * $Revision: 1.9 $
7    *
8    * ====================================================================
9    *
10   *  Copyright 2005-2006 netseeker aka Michael Manske
11   *
12   *  Licensed under the Apache License, Version 2.0 (the "License");
13   *  you may not use this file except in compliance with the License.
14   *  You may obtain a copy of the License at
15   *
16   *      http://www.apache.org/licenses/LICENSE-2.0
17   *
18   *  Unless required by applicable law or agreed to in writing, software
19   *  distributed under the License is distributed on an "AS IS" BASIS,
20   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21   *  See the License for the specific language governing permissions and
22   *  limitations under the License.
23   * ====================================================================
24   *
25   * This file is part of the ejoe framework.
26   * For more information on the author, please see
27   * <http://www.manskes.de/>.
28   *
29   *********************************************************************/
30  package de.netseeker.ejoe;
31  
32  import java.io.IOException;
33  import java.io.InputStream;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.Channels;
36  import java.nio.channels.ClosedChannelException;
37  import java.nio.channels.NonReadableChannelException;
38  import java.nio.channels.ReadableByteChannel;
39  import java.nio.channels.SelectionKey;
40  import java.nio.channels.SocketChannel;
41  import java.util.logging.Level;
42  import java.util.logging.Logger;
43  
44  import de.netseeker.ejoe.adapter.AdapterFactory;
45  import de.netseeker.ejoe.adapter.SerializeAdapter;
46  import de.netseeker.ejoe.handler.ServerHandler;
47  import de.netseeker.ejoe.io.ByteBufferInputStream;
48  import de.netseeker.ejoe.io.ChannelInputStream;
49  import de.netseeker.ejoe.io.IOUtils;
50  import de.netseeker.ejoe.io.IncompleteIOException;
51  
52  /***
53   * ConnectionReader targets two jobs:
54   * <ul>
55   * 	<li>Read (partial) data from a established client connection</li>
56   *  <li>Invoke the server handler to process the received data</li>
57   * </ul>
58   * @author netseeker
59   * @since 0.3.0
60   */
61  final class ConnectionReader implements Runnable
62  {
63  	private static final Logger		log	= Logger.getLogger(ConnectionReader.class.getName());
64  
65  	private final ChannelRegistrar	_registrar;
66  
67  	private final ServerHandler		_handler;
68  
69  	private ConnectionHeader		_senderInfo, _receiverInfo;
70  
71  	/***
72  	 * @param channel a socket channel ready for reading and writing
73  	 * @param adapter (de)serialize adapter
74  	 * @param handler the working horse handling the transported input object
75  	 */
76  	public ConnectionReader(final ChannelRegistrar registrar, ConnectionHeader receiverInfo,
77  			ConnectionHeader senderInfo, final ServerHandler handler)
78  	{
79  		this._registrar = registrar;
80  		this._receiverInfo = receiverInfo;
81  		this._senderInfo = senderInfo;
82  		this._handler = handler;
83  	}
84  
85  	/*
86  	 * (non-Javadoc)
87  	 *
88  	 * @see java.lang.Runnable#run()
89  	 */
90  	public void run()
91  	{
92  		Object request = null;
93  		SocketChannel channel = (SocketChannel) this._senderInfo.getChannel();
94  
95  		try
96  		{
97  			if (this._senderInfo == null || !this._senderInfo.isConnected())
98  			{
99  				log.log(Level.FINEST, "Handshaking client...");
100 				// handshake the client, get infos about compression...
101 				this._senderInfo = IOUtils.handshake(channel, this._receiverInfo, false,
102 						EJConstants.EJOE_CONNECTION_TIMEOUT);
103 			}
104 
105 			if (this._senderInfo != null)
106 			{
107 				log.log(Level.FINEST, "Client requested " + this._senderInfo.getAdapterName() );
108 				SerializeAdapter adapter = AdapterFactory.createAdapter(this._senderInfo.getAdapterName());
109 
110 				if (this._receiverInfo.hasNonBlockingReadWrite())
111 				{
112 					log.log(Level.FINE, "Going to read client request on a non blocking socket...");
113 					request = read(adapter);
114 				}
115 				else
116 				{
117 					log.log(Level.FINE, "Going to read client request on a blocking socket...");
118 					request = readBlocked(adapter);
119 				}
120 
121 				this._senderInfo.releaseWaitingBuffer();
122 
123 				log.log(Level.FINE, "Client request read.");
124 				if (request != null)
125 				{
126 					Object result = handleObject(request);
127 					if (this._registrar.isValid())
128 					{
129 						this._senderInfo.setAttachment(result);
130 						this._registrar.register(this._senderInfo, SelectionKey.OP_WRITE);
131 					}
132 				}
133 			}
134 			else
135 			{
136 				log.log(Level.WARNING,
137 						"Connection timeout reached while waiting for Handshake complete. Closing connection.");
138 				IOUtils.closeQuite(channel);
139 				this._senderInfo = null;
140 			}
141 		}
142 		catch (IncompleteIOException ioe)
143 		{
144 			this._senderInfo.setWaitingBuffer(ioe.getIOBuffer());
145 			this._registrar.register(this._senderInfo, SelectionKey.OP_READ);
146 		}
147 		catch (NonReadableChannelException e)
148 		{
149 			log.log(Level.INFO, "Connection closed by client.");
150 			IOUtils.closeQuite(channel);
151 			this._senderInfo = null;
152 		}
153 		catch (ClosedChannelException cce)
154 		{
155 			log.log(Level.INFO, "Connection closed by client.");
156 			this._senderInfo = null;
157 			IOUtils.closeQuite(channel);
158 		}
159 		catch (IOException e)
160 		{
161 			if (channel.isConnected() && channel.isOpen())
162 			{
163 				// something goes completely wrong!
164 				log.log(Level.SEVERE, "!!! Exception while reading client data !!!", e);
165 			}
166 			else
167 			{
168 				log.log(Level.INFO, "Connection closed by client.");
169 			}
170 			IOUtils.closeQuite(channel);
171 			this._senderInfo = null;
172 		}
173 		catch (Exception e)
174 		{
175 			// OK, we got screwed up completely
176 			// something goes completely wrong!
177 			log.log(Level.SEVERE, "!!! Exception while reading client data !!!", e);
178 			IOUtils.closeQuite(channel);
179 			this._senderInfo = null;
180 		}
181 	}
182 
183 	/***
184 	 * Reads and deserialize a request object from a socket channel
185 	 * @param adapter the deserialization adapter to use
186 	 * @return the deserialized request object
187 	 * @throws IOException
188 	 */
189 	private Object read(SerializeAdapter adapter) throws IOException
190 	{
191 		ByteBufferInputStream in = null;
192 		ByteBuffer dataBuf = null;
193 		Object result = null;
194 		SocketChannel channel = (SocketChannel) this._senderInfo.getChannel();
195 
196 		try
197 		{
198 			if (!this._senderInfo.hasWaitingBuffer())
199 			{
200 				int length = IOUtils.readHeader(this._senderInfo);
201 				dataBuf = ByteBuffer.allocateDirect(length);
202 				log.log(Level.FINE, "Going to read client request with length: " + length);
203 			}
204 			else
205 			{
206 				dataBuf = this._senderInfo.getWaitingBuffer();
207 			}
208 
209 			IOUtils.nonBlockingRead(channel, dataBuf);
210 			dataBuf.flip();
211 
212 			in = new ByteBufferInputStream(dataBuf);
213 			result = deserialize(adapter, in);
214 		}
215 		finally
216 		{
217 			IOUtils.closeQuite(in);
218 		}
219 
220 		return result;
221 	}
222 
223 	/***
224 	 * @return
225 	 * @throws IOException
226 	 */
227 	private Object readBlocked(SerializeAdapter adapter) throws IOException
228 	{
229 		return deserialize(adapter, new ChannelInputStream(Channels
230 				.newInputStream((ReadableByteChannel) this._senderInfo.getChannel())));
231 	}
232 
233 	/***
234 	 * @param clientInfo
235 	 * @param in
236 	 * @return
237 	 * @throws IOException
238 	 */
239 	private Object deserialize(SerializeAdapter adapter, InputStream in) throws IOException
240 	{
241 		boolean compressed = this._receiverInfo.hasCompression() && this._senderInfo.hasCompression();
242 		return IOUtils.adapterDeserialize(adapter, in, compressed);
243 	}
244 
245 	/***
246 	 * @param obj
247 	 * @return
248 	 */
249 	private Object handleObject(Object obj)
250 	{
251 		Object result = null;
252 
253 		try
254 		{
255 			result = _handler.handle(obj);
256 		}
257 		catch (Exception e)
258 		{
259 			log.log(Level.WARNING, "Exception in ServerHandler " + _handler + " occured.", e);
260 			result = e;
261 		}
262 
263 		return result;
264 	}
265 }