View Javadoc

1   /**********************************************************************
2    * DataChannel.java
3    * created on 10.02.2006 by netseeker
4    * $Source$
5    * $Date$
6    * $Revision$
7    *
8    * ====================================================================
9    *
10   *  Copyright 2006 netseeker aka Michael Manske
11   *
12   *  Licensed under the Apache License, Version 2.0 (the "License");
13   *  you may not use this file except in compliance with the License.
14   *  You may obtain a copy of the License at
15   *
16   *      http://www.apache.org/licenses/LICENSE-2.0
17   *
18   *  Unless required by applicable law or agreed to in writing, software
19   *  distributed under the License is distributed on an "AS IS" BASIS,
20   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21   *  See the License for the specific language governing permissions and
22   *  limitations under the License.
23   * ====================================================================
24   *
25   * This file is part of the ejoe framework.
26   * For more information on the author, please see
27   * <http://www.manskes.de/>.
28   *
29   *********************************************************************/
30  package de.netseeker.ejoe.io;
31  
32  import java.io.IOException;
33  import java.io.UnsupportedEncodingException;
34  import java.net.SocketTimeoutException;
35  import java.nio.ByteBuffer;
36  import java.nio.channels.ClosedChannelException;
37  import java.nio.channels.ReadableByteChannel;
38  import java.nio.channels.SelectionKey;
39  import java.nio.channels.SocketChannel;
40  import java.nio.channels.WritableByteChannel;
41  import java.text.ParseException;
42  import java.util.logging.Level;
43  import java.util.logging.Logger;
44  
45  import de.netseeker.ejoe.ConnectionHeader;
46  import de.netseeker.ejoe.EJConstants;
47  import de.netseeker.ejoe.cache.ByteBufferAllocator;
48  
49  /***
50   * Utility class handling all socket oriented data IO on nio channels. DataChannels must be implemented as singletons to
51   * avoid creation of a new object for each socket IO operation. Otherwise heavy load could result in fast-growing memory
52   * consumption.
53   * 
54   * @author netseeker
55   * @since 0.3.9.1
56   */
57  public class DataChannel
58  {
59      private static final Logger logger      = Logger.getLogger( DataChannel.class.getName() );
60  
61      private static DataChannel  dataChannel = new DataChannel();
62  
63      /***
64       * Singleton with hidden constructor, only child classes are allowed to construct new instances
65       */
66      protected DataChannel()
67      {
68          super();
69      }
70  
71      /***
72       * Invoking this method has the same effect as invoking {@link DataChannel#getInstance(null)}
73       * 
74       * @return a default instance of DataChannel
75       */
76      public static DataChannel getInstance()
77      {
78          return getInstance( null );
79      }
80  
81      /***
82       * Returns appropiate instance of DataChannel for the given connection header. If the header is null an instance of
83       * this class will be returned.
84       * 
85       * @param header a valid connection header or null
86       * @return an instance of DataChannel
87       */
88      public static DataChannel getInstance( ConnectionHeader header )
89      {
90          if ( header == null )
91          {
92              return dataChannel;
93          }
94          else if ( header.isHttp() )
95          {
96              return HttpChannel.getInstance();
97          }
98          else
99          {
100             return DefaultChannel.getInstance();
101         }
102     }
103 
104     /***
105      * Handshake for a socket channel. It's used as workaround for a know issue with java sockets: Sometimes only the
106      * first byte will get transferred through a socket connection when reading from it first time. The other bytes will
107      * follow not until the next read. This method sends/receives one Byte through the socket to "initialize" the socket
108      * channel. So all following read/write operations don't have to handle that "1-Byte issue". The send/received Byte
109      * is used also as connection header, it contains information about compression, nio usage, if the connection is a
110      * persistent or non-persistent one...
111      * 
112      * @param sendBeforeReceive if true we will try to send one byte then read one byte otherwise we will use the
113      *            opposite way around.
114      * @throws IOException
115      */
116     public ConnectionHeader handshake( final ConnectionHeader header, SocketChannel channel, long timeout )
117             throws IOException, ParseException
118     {
119         boolean isHttp = header.isHttp();
120         ConnectionHeader receiverHeader = null;
121         ByteBuffer magicBuf = null;
122         byte[] preReadData = null;
123 
124         if ( !header.isClient() )
125         {
126             // expect to read the first four bytes for determining the used protocol
127             magicBuf = ByteBufferAllocator.allocate( 4, false );
128             semiBlockingRead( channel, magicBuf, timeout );
129             // at least the handshake must be read in one operation
130             // if not prevent us from dealing with bad networks or crappy clients
131             if ( magicBuf.hasRemaining() ) return null;
132             magicBuf.flip();
133             // copy the read four byte into a buffer - we will need them again maybe
134             preReadData = new byte[4];
135             magicBuf.get( preReadData );
136             magicBuf.rewind();
137             // read the first four bytes as int
138             int magicNo = magicBuf.getInt();
139             isHttp = (magicNo != EJConstants.EJOE_MAGIC_NUMBER);
140         }
141 
142         try
143         {
144             // seems like the usual EJOE protocol
145             if ( !isHttp )
146             {
147                 // complete the handshake using our DefaultChannel, we can skip the preread four bytes
148                 receiverHeader = DefaultChannel.getInstance().handshake( header, channel, timeout );
149             }
150             // seems like HTTP protocol
151             else
152             {
153                 // complete the handshake using a HttpChannel and hand over the preread four bytes
154                 receiverHeader = ((HttpChannel) HttpChannel.getInstance()).handshake( header, channel, preReadData,
155                                                                                       timeout );
156             }
157 
158             // copy the host into the received client header
159             if ( receiverHeader.isClient() && receiverHeader.getHost() == null )
160             {
161                 receiverHeader.setHost( header.getHost() );
162             }
163             // copy the adapter into the returned server header
164             if ( header.isClient() && header.hasAdapter() && receiverHeader.getAdapterName() == null )
165             {
166                 receiverHeader.setAdapterName( header.getAdapterName() );
167             }
168         }
169         catch ( IncompleteIOException ioe )
170         {
171             // nothing to do
172             // at least the handshake must be read in one operation
173             // if not prevent us from dealing with bad networks or crappy clients
174         }
175 
176         return receiverHeader;
177     }
178 
179     /***
180      * Tries to send the given ByteBuffer completely through the given SocketChannel three times
181      * 
182      * @param channel
183      * @param buffer
184      * @throws IncompleteIOException if the given ByteBuffer could not be send completely
185      * @throws IOException
186      */
187     public void nonBlockingWrite( WritableByteChannel channel, ByteBuffer buffer ) throws IOException
188     {
189         int runs = 0;
190 
191         do
192         {
193             channel.write( buffer );
194             runs++;
195         }
196         while ( buffer.hasRemaining() && runs < EJConstants.NIO_MAX_ITERATIONS );
197 
198         if ( buffer.hasRemaining() )
199         {
200             logger.log( Level.FINEST, "Incomplete write detected, registering for write again." );
201             buffer.compact();
202             throw new IncompleteIOException( buffer, SelectionKey.OP_WRITE );
203         }
204     }
205 
206     /***
207      * Tries to send the given ByteBuffer completely through the given SocketChannel within a given timeout
208      * 
209      * @param channel
210      * @param buffer
211      * @param timeout
212      * @throws IncompleteIOException if the given ByteBuffer could not be send completely
213      * @throws IOException
214      */
215     public void semiBlockingWrite( WritableByteChannel channel, ByteBuffer buffer, long timeout ) throws IOException
216     {
217         long timestamp = System.currentTimeMillis();
218         long timePeriod = -1;
219 
220         do
221         {
222             channel.write( buffer );
223             timePeriod = System.currentTimeMillis() - timestamp;
224         }
225         while ( buffer.hasRemaining() && (timePeriod < timeout) );
226 
227         if ( timePeriod >= timeout )
228         {
229             throw new SocketTimeoutException();
230         }
231 
232         if ( buffer.hasRemaining() )
233         {
234             logger.log( Level.FINEST, "Incomplete write detected, registering for write again." );
235             buffer.compact();
236             throw new IncompleteIOException( buffer, SelectionKey.OP_WRITE );
237         }
238     }
239 
240     /***
241      * Tries to send the given ByteBuffer completely through the given SocketChannel within a given timeout
242      * 
243      * @param channel
244      * @param buffer
245      * @return
246      * @throws IOException
247      */
248     public static void nonBlockingRead( ReadableByteChannel channel, ByteBuffer buffer ) throws IOException
249     {
250         int read = 0, runs = 0;
251 
252         try
253         {
254             do
255             {
256                 read = channel.read( buffer );
257                 runs++;
258             }
259             // read until end of data is reached (-1) or the buffer is full or we tried to read for
260             // EJConstants.NIO_MAX_ITERATIONS
261             while ( read != -1 && buffer.hasRemaining() && runs < EJConstants.NIO_MAX_ITERATIONS );
262         }
263         catch ( IOException e )
264         {
265             // most likely the sender did close the connection or something other (firewall?) does interfere the
266             // communication
267             ClosedChannelException che = new ClosedChannelException();
268             che.setStackTrace( e.getStackTrace() );
269             che.initCause( e.getCause() );
270             throw che;
271         }
272 
273         if ( buffer.hasRemaining() )
274         {
275             if ( read == -1 )
276             {
277                 throw new ClosedChannelException();
278             }
279             else
280             {
281                 logger.log( Level.FINEST, "Incomplete read detected, registering for read again." );
282                 throw new IncompleteIOException( buffer, SelectionKey.OP_READ );
283             }
284         }
285     }
286 
287     /***
288      * Tries to read ByteBuffer.remaining() bytes the into given ByteBuffer from the given SocketChannel within a given
289      * timeout.
290      * 
291      * @param channel
292      * @param buffer
293      * @param timeout
294      * @return
295      * @throws IOException
296      */
297     public static void semiBlockingRead( ReadableByteChannel channel, ByteBuffer buffer, long timeout )
298             throws IOException
299     {
300         long timestamp = System.currentTimeMillis();
301         long timePeriod = -1;
302         int read = 0;
303 
304         try
305         {
306             do
307             {
308                 read = channel.read( buffer );
309                 timePeriod = System.currentTimeMillis() - timestamp;
310             }
311             while ( read != -1 && buffer.hasRemaining() && (timePeriod < timeout) );
312         }
313         catch ( IOException e )
314         {
315             // most likely the sender did close the connection
316             ClosedChannelException che = new ClosedChannelException();
317             che.setStackTrace( e.getStackTrace() );
318             che.initCause( e.getCause() );
319             throw che;
320         }
321 
322         if ( timePeriod >= timeout )
323         {
324             throw new SocketTimeoutException();
325         }
326 
327         if ( buffer.hasRemaining() )
328         {
329             if ( read == -1 )
330             {
331                 throw new ClosedChannelException();
332             }
333             else
334             {
335                 logger.log( Level.FINEST, "Incomplete read detected, registering for read again." );
336                 throw new IncompleteIOException( buffer, SelectionKey.OP_READ );
337             }
338         }
339     }
340 
341     /***
342      * Receives a EJOE specific header containing the size of the next ByteBuffer.
343      * 
344      * @param timeout read timeout
345      * @return the length of the following data package
346      * @throws IOException
347      */
348     public int readHeader( ConnectionHeader header, long timeout ) throws IOException
349     {
350         throw new UnsupportedOperationException();
351     }
352 
353     /***
354      * Sends a EJOE specific header containing the lengh of the given ByteBuffer
355      * 
356      * @param timeout write timeout
357      * @throws IOException
358      */
359     public void writeHeader( ConnectionHeader header, ByteBuffer buffer, long timeout ) throws IOException
360     {
361         throw new UnsupportedOperationException();
362     }
363 
364     /***
365      * Decodes and reformats request data if the underlying protocol layer makes it neccessary
366      * 
367      * @param buffer
368      */
369     public ByteBuffer decode( ByteBuffer buffer ) throws UnsupportedEncodingException
370     {
371         // default implementation does nothing
372         return buffer;
373     }
374 }