1 /**********************************************************************
2 * ConnectionReader.java
3 * created on 05.03.2005 by netseeker
4 * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/ConnectionReader.java,v $
5 * $Date: 2006/03/22 00:06:58 $
6 * $Revision: 1.9 $
7 *
8 * ====================================================================
9 *
10 * Copyright 2005-2006 netseeker aka Michael Manske
11 *
12 * Licensed under the Apache License, Version 2.0 (the "License");
13 * you may not use this file except in compliance with the License.
14 * You may obtain a copy of the License at
15 *
16 * http://www.apache.org/licenses/LICENSE-2.0
17 *
18 * Unless required by applicable law or agreed to in writing, software
19 * distributed under the License is distributed on an "AS IS" BASIS,
20 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21 * See the License for the specific language governing permissions and
22 * limitations under the License.
23 * ====================================================================
24 *
25 * This file is part of the ejoe framework.
26 * For more information on the author, please see
27 * <http://www.manskes.de/>.
28 *
29 *********************************************************************/
30 package de.netseeker.ejoe;
31
32 import java.io.IOException;
33 import java.io.InputStream;
34 import java.nio.ByteBuffer;
35 import java.nio.channels.Channels;
36 import java.nio.channels.ClosedChannelException;
37 import java.nio.channels.NonReadableChannelException;
38 import java.nio.channels.ReadableByteChannel;
39 import java.nio.channels.SelectionKey;
40 import java.nio.channels.SocketChannel;
41 import java.util.logging.Level;
42 import java.util.logging.Logger;
43
44 import de.netseeker.ejoe.adapter.AdapterFactory;
45 import de.netseeker.ejoe.adapter.SerializeAdapter;
46 import de.netseeker.ejoe.handler.ServerHandler;
47 import de.netseeker.ejoe.io.ByteBufferInputStream;
48 import de.netseeker.ejoe.io.ChannelInputStream;
49 import de.netseeker.ejoe.io.IOUtils;
50 import de.netseeker.ejoe.io.IncompleteIOException;
51
52 /***
53 * ConnectionReader targets two jobs:
54 * <ul>
55 * <li>Read (partial) data from a established client connection</li>
56 * <li>Invoke the server handler to process the received data</li>
57 * </ul>
58 * @author netseeker
59 * @since 0.3.0
60 */
61 final class ConnectionReader implements Runnable
62 {
63 private static final Logger log = Logger.getLogger(ConnectionReader.class.getName());
64
65 private final ChannelRegistrar _registrar;
66
67 private final ServerHandler _handler;
68
69 private ConnectionHeader _senderInfo, _receiverInfo;
70
71 /***
72 * @param channel a socket channel ready for reading and writing
73 * @param adapter (de)serialize adapter
74 * @param handler the working horse handling the transported input object
75 */
76 public ConnectionReader(final ChannelRegistrar registrar, ConnectionHeader receiverInfo,
77 ConnectionHeader senderInfo, final ServerHandler handler)
78 {
79 this._registrar = registrar;
80 this._receiverInfo = receiverInfo;
81 this._senderInfo = senderInfo;
82 this._handler = handler;
83 }
84
85
86
87
88
89
90 public void run()
91 {
92 Object request = null;
93 SocketChannel channel = (SocketChannel) this._senderInfo.getChannel();
94
95 try
96 {
97 if (this._senderInfo == null || !this._senderInfo.isConnected())
98 {
99 log.log(Level.FINEST, "Handshaking client...");
100
101 this._senderInfo = IOUtils.handshake(channel, this._receiverInfo, false,
102 EJConstants.EJOE_CONNECTION_TIMEOUT);
103 }
104
105 if (this._senderInfo != null)
106 {
107 log.log(Level.FINEST, "Client requested " + this._senderInfo.getAdapterName() );
108 SerializeAdapter adapter = AdapterFactory.createAdapter(this._senderInfo.getAdapterName());
109
110 if (this._receiverInfo.hasNonBlockingReadWrite())
111 {
112 log.log(Level.FINE, "Going to read client request on a non blocking socket...");
113 request = read(adapter);
114 }
115 else
116 {
117 log.log(Level.FINE, "Going to read client request on a blocking socket...");
118 request = readBlocked(adapter);
119 }
120
121 this._senderInfo.releaseWaitingBuffer();
122
123 log.log(Level.FINE, "Client request read.");
124 if (request != null)
125 {
126 Object result = handleObject(request);
127 if (this._registrar.isValid())
128 {
129 this._senderInfo.setAttachment(result);
130 this._registrar.register(this._senderInfo, SelectionKey.OP_WRITE);
131 }
132 }
133 }
134 else
135 {
136 log.log(Level.WARNING,
137 "Connection timeout reached while waiting for Handshake complete. Closing connection.");
138 IOUtils.closeQuite(channel);
139 this._senderInfo = null;
140 }
141 }
142 catch (IncompleteIOException ioe)
143 {
144 this._senderInfo.setWaitingBuffer(ioe.getIOBuffer());
145 this._registrar.register(this._senderInfo, SelectionKey.OP_READ);
146 }
147 catch (NonReadableChannelException e)
148 {
149 log.log(Level.INFO, "Connection closed by client.");
150 IOUtils.closeQuite(channel);
151 this._senderInfo = null;
152 }
153 catch (ClosedChannelException cce)
154 {
155 log.log(Level.INFO, "Connection closed by client.");
156 this._senderInfo = null;
157 IOUtils.closeQuite(channel);
158 }
159 catch (IOException e)
160 {
161 if (channel.isConnected() && channel.isOpen())
162 {
163
164 log.log(Level.SEVERE, "!!! Exception while reading client data !!!", e);
165 }
166 else
167 {
168 log.log(Level.INFO, "Connection closed by client.");
169 }
170 IOUtils.closeQuite(channel);
171 this._senderInfo = null;
172 }
173 catch (Exception e)
174 {
175
176
177 log.log(Level.SEVERE, "!!! Exception while reading client data !!!", e);
178 IOUtils.closeQuite(channel);
179 this._senderInfo = null;
180 }
181 }
182
183 /***
184 * Reads and deserialize a request object from a socket channel
185 * @param adapter the deserialization adapter to use
186 * @return the deserialized request object
187 * @throws IOException
188 */
189 private Object read(SerializeAdapter adapter) throws IOException
190 {
191 ByteBufferInputStream in = null;
192 ByteBuffer dataBuf = null;
193 Object result = null;
194 SocketChannel channel = (SocketChannel) this._senderInfo.getChannel();
195
196 try
197 {
198 if (!this._senderInfo.hasWaitingBuffer())
199 {
200 int length = IOUtils.readHeader(this._senderInfo);
201 dataBuf = ByteBuffer.allocateDirect(length);
202 log.log(Level.FINE, "Going to read client request with length: " + length);
203 }
204 else
205 {
206 dataBuf = this._senderInfo.getWaitingBuffer();
207 }
208
209 IOUtils.nonBlockingRead(channel, dataBuf);
210 dataBuf.flip();
211
212 in = new ByteBufferInputStream(dataBuf);
213 result = deserialize(adapter, in);
214 }
215 finally
216 {
217 IOUtils.closeQuite(in);
218 }
219
220 return result;
221 }
222
223 /***
224 * @return
225 * @throws IOException
226 */
227 private Object readBlocked(SerializeAdapter adapter) throws IOException
228 {
229 return deserialize(adapter, new ChannelInputStream(Channels
230 .newInputStream((ReadableByteChannel) this._senderInfo.getChannel())));
231 }
232
233 /***
234 * @param clientInfo
235 * @param in
236 * @return
237 * @throws IOException
238 */
239 private Object deserialize(SerializeAdapter adapter, InputStream in) throws IOException
240 {
241 boolean compressed = this._receiverInfo.hasCompression() && this._senderInfo.hasCompression();
242 return IOUtils.adapterDeserialize(adapter, in, compressed);
243 }
244
245 /***
246 * @param obj
247 * @return
248 */
249 private Object handleObject(Object obj)
250 {
251 Object result = null;
252
253 try
254 {
255 result = _handler.handle(obj);
256 }
257 catch (Exception e)
258 {
259 log.log(Level.WARNING, "Exception in ServerHandler " + _handler + " occured.", e);
260 result = e;
261 }
262
263 return result;
264 }
265 }