1 /**********************************************************************
2 * DataChannel.java
3 * created on 10.02.2006 by netseeker
4 * $Source$
5 * $Date$
6 * $Revision$
7 *
8 * ====================================================================
9 *
10 * Copyright 2006 netseeker aka Michael Manske
11 *
12 * Licensed under the Apache License, Version 2.0 (the "License");
13 * you may not use this file except in compliance with the License.
14 * You may obtain a copy of the License at
15 *
16 * http://www.apache.org/licenses/LICENSE-2.0
17 *
18 * Unless required by applicable law or agreed to in writing, software
19 * distributed under the License is distributed on an "AS IS" BASIS,
20 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21 * See the License for the specific language governing permissions and
22 * limitations under the License.
23 * ====================================================================
24 *
25 * This file is part of the ejoe framework.
26 * For more information on the author, please see
27 * <http://www.manskes.de/>.
28 *
29 *********************************************************************/
30 package de.netseeker.ejoe.io;
31
32 import java.io.IOException;
33 import java.io.UnsupportedEncodingException;
34 import java.net.SocketTimeoutException;
35 import java.nio.ByteBuffer;
36 import java.nio.channels.ClosedChannelException;
37 import java.nio.channels.ReadableByteChannel;
38 import java.nio.channels.SelectionKey;
39 import java.nio.channels.SocketChannel;
40 import java.nio.channels.WritableByteChannel;
41 import java.text.ParseException;
42 import java.util.logging.Level;
43 import java.util.logging.Logger;
44
45 import de.netseeker.ejoe.ConnectionHeader;
46 import de.netseeker.ejoe.EJConstants;
47 import de.netseeker.ejoe.cache.ByteBufferAllocator;
48
49 /***
50 * Utility class handling all socket oriented data IO on nio channels. DataChannels must be implemented as singletons to
51 * avoid creation of a new object for each socket IO operation. Otherwise heavy load could result in fast-growing memory
52 * consumption.
53 *
54 * @author netseeker
55 * @since 0.3.9.1
56 */
57 public class DataChannel
58 {
59 private static final Logger logger = Logger.getLogger( DataChannel.class.getName() );
60
61 private static DataChannel dataChannel = new DataChannel();
62
63 /***
64 * Singleton with hidden constructor, only child classes are allowed to construct new instances
65 */
66 protected DataChannel()
67 {
68 super();
69 }
70
71 /***
72 * Invoking this method has the same effect as invoking {@link DataChannel#getInstance(null)}
73 *
74 * @return a default instance of DataChannel
75 */
76 public static DataChannel getInstance()
77 {
78 return getInstance( null );
79 }
80
81 /***
82 * Returns appropiate instance of DataChannel for the given connection header. If the header is null an instance of
83 * this class will be returned.
84 *
85 * @param header a valid connection header or null
86 * @return an instance of DataChannel
87 */
88 public static DataChannel getInstance( ConnectionHeader header )
89 {
90 if ( header == null )
91 {
92 return dataChannel;
93 }
94 else if ( header.isHttp() )
95 {
96 return HttpChannel.getInstance();
97 }
98 else
99 {
100 return DefaultChannel.getInstance();
101 }
102 }
103
104 /***
105 * Handshake for a socket channel. It's used as workaround for a know issue with java sockets: Sometimes only the
106 * first byte will get transferred through a socket connection when reading from it first time. The other bytes will
107 * follow not until the next read. This method sends/receives one Byte through the socket to "initialize" the socket
108 * channel. So all following read/write operations don't have to handle that "1-Byte issue". The send/received Byte
109 * is used also as connection header, it contains information about compression, nio usage, if the connection is a
110 * persistent or non-persistent one...
111 *
112 * @param sendBeforeReceive if true we will try to send one byte then read one byte otherwise we will use the
113 * opposite way around.
114 * @throws IOException
115 */
116 public ConnectionHeader handshake( final ConnectionHeader header, SocketChannel channel, long timeout )
117 throws IOException, ParseException
118 {
119 boolean isHttp = header.isHttp();
120 ConnectionHeader receiverHeader = null;
121 ByteBuffer magicBuf = null;
122 byte[] preReadData = null;
123
124 if ( !header.isClient() )
125 {
126
127 magicBuf = ByteBufferAllocator.allocate( 4, false );
128 semiBlockingRead( channel, magicBuf, timeout );
129
130
131 if ( magicBuf.hasRemaining() ) return null;
132 magicBuf.flip();
133
134 preReadData = new byte[4];
135 magicBuf.get( preReadData );
136 magicBuf.rewind();
137
138 int magicNo = magicBuf.getInt();
139 isHttp = (magicNo != EJConstants.EJOE_MAGIC_NUMBER);
140 }
141
142 try
143 {
144
145 if ( !isHttp )
146 {
147
148 receiverHeader = DefaultChannel.getInstance().handshake( header, channel, timeout );
149 }
150
151 else
152 {
153
154 receiverHeader = ((HttpChannel) HttpChannel.getInstance()).handshake( header, channel, preReadData,
155 timeout );
156 }
157
158
159 if ( receiverHeader.isClient() && receiverHeader.getHost() == null )
160 {
161 receiverHeader.setHost( header.getHost() );
162 }
163
164 if ( header.isClient() && header.hasAdapter() && receiverHeader.getAdapterName() == null )
165 {
166 receiverHeader.setAdapterName( header.getAdapterName() );
167 }
168 }
169 catch ( IncompleteIOException ioe )
170 {
171
172
173
174 }
175
176 return receiverHeader;
177 }
178
179 /***
180 * Tries to send the given ByteBuffer completely through the given SocketChannel three times
181 *
182 * @param channel
183 * @param buffer
184 * @throws IncompleteIOException if the given ByteBuffer could not be send completely
185 * @throws IOException
186 */
187 public void nonBlockingWrite( WritableByteChannel channel, ByteBuffer buffer ) throws IOException
188 {
189 int runs = 0;
190
191 do
192 {
193 channel.write( buffer );
194 runs++;
195 }
196 while ( buffer.hasRemaining() && runs < EJConstants.NIO_MAX_ITERATIONS );
197
198 if ( buffer.hasRemaining() )
199 {
200 logger.log( Level.FINEST, "Incomplete write detected, registering for write again." );
201 buffer.compact();
202 throw new IncompleteIOException( buffer, SelectionKey.OP_WRITE );
203 }
204 }
205
206 /***
207 * Tries to send the given ByteBuffer completely through the given SocketChannel within a given timeout
208 *
209 * @param channel
210 * @param buffer
211 * @param timeout
212 * @throws IncompleteIOException if the given ByteBuffer could not be send completely
213 * @throws IOException
214 */
215 public void semiBlockingWrite( WritableByteChannel channel, ByteBuffer buffer, long timeout ) throws IOException
216 {
217 long timestamp = System.currentTimeMillis();
218 long timePeriod = -1;
219
220 do
221 {
222 channel.write( buffer );
223 timePeriod = System.currentTimeMillis() - timestamp;
224 }
225 while ( buffer.hasRemaining() && (timePeriod < timeout) );
226
227 if ( timePeriod >= timeout )
228 {
229 throw new SocketTimeoutException();
230 }
231
232 if ( buffer.hasRemaining() )
233 {
234 logger.log( Level.FINEST, "Incomplete write detected, registering for write again." );
235 buffer.compact();
236 throw new IncompleteIOException( buffer, SelectionKey.OP_WRITE );
237 }
238 }
239
240 /***
241 * Tries to send the given ByteBuffer completely through the given SocketChannel within a given timeout
242 *
243 * @param channel
244 * @param buffer
245 * @return
246 * @throws IOException
247 */
248 public static void nonBlockingRead( ReadableByteChannel channel, ByteBuffer buffer ) throws IOException
249 {
250 int read = 0, runs = 0;
251
252 try
253 {
254 do
255 {
256 read = channel.read( buffer );
257 runs++;
258 }
259
260
261 while ( read != -1 && buffer.hasRemaining() && runs < EJConstants.NIO_MAX_ITERATIONS );
262 }
263 catch ( IOException e )
264 {
265
266
267 ClosedChannelException che = new ClosedChannelException();
268 che.setStackTrace( e.getStackTrace() );
269 che.initCause( e.getCause() );
270 throw che;
271 }
272
273 if ( buffer.hasRemaining() )
274 {
275 if ( read == -1 )
276 {
277 throw new ClosedChannelException();
278 }
279 else
280 {
281 logger.log( Level.FINEST, "Incomplete read detected, registering for read again." );
282 throw new IncompleteIOException( buffer, SelectionKey.OP_READ );
283 }
284 }
285 }
286
287 /***
288 * Tries to read ByteBuffer.remaining() bytes the into given ByteBuffer from the given SocketChannel within a given
289 * timeout.
290 *
291 * @param channel
292 * @param buffer
293 * @param timeout
294 * @return
295 * @throws IOException
296 */
297 public static void semiBlockingRead( ReadableByteChannel channel, ByteBuffer buffer, long timeout )
298 throws IOException
299 {
300 long timestamp = System.currentTimeMillis();
301 long timePeriod = -1;
302 int read = 0;
303
304 try
305 {
306 do
307 {
308 read = channel.read( buffer );
309 timePeriod = System.currentTimeMillis() - timestamp;
310 }
311 while ( read != -1 && buffer.hasRemaining() && (timePeriod < timeout) );
312 }
313 catch ( IOException e )
314 {
315
316 ClosedChannelException che = new ClosedChannelException();
317 che.setStackTrace( e.getStackTrace() );
318 che.initCause( e.getCause() );
319 throw che;
320 }
321
322 if ( timePeriod >= timeout )
323 {
324 throw new SocketTimeoutException();
325 }
326
327 if ( buffer.hasRemaining() )
328 {
329 if ( read == -1 )
330 {
331 throw new ClosedChannelException();
332 }
333 else
334 {
335 logger.log( Level.FINEST, "Incomplete read detected, registering for read again." );
336 throw new IncompleteIOException( buffer, SelectionKey.OP_READ );
337 }
338 }
339 }
340
341 /***
342 * Receives a EJOE specific header containing the size of the next ByteBuffer.
343 *
344 * @param timeout read timeout
345 * @return the length of the following data package
346 * @throws IOException
347 */
348 public int readHeader( ConnectionHeader header, long timeout ) throws IOException
349 {
350 throw new UnsupportedOperationException();
351 }
352
353 /***
354 * Sends a EJOE specific header containing the lengh of the given ByteBuffer
355 *
356 * @param timeout write timeout
357 * @throws IOException
358 */
359 public void writeHeader( ConnectionHeader header, ByteBuffer buffer, long timeout ) throws IOException
360 {
361 throw new UnsupportedOperationException();
362 }
363
364 /***
365 * Decodes and reformats request data if the underlying protocol layer makes it neccessary
366 *
367 * @param buffer
368 */
369 public ByteBuffer decode( ByteBuffer buffer ) throws UnsupportedEncodingException
370 {
371
372 return buffer;
373 }
374 }