View Javadoc

1   /**********************************************************************
2    * DefaultChannel.java
3    * created on 01.04.2006 by netseeker
4    * $Source$
5    * $Date$
6    * $Revision$
7    *
8    * ====================================================================
9    *
10   *  Copyright 2006 netseeker aka Michael Manske
11   *
12   *  Licensed under the Apache License, Version 2.0 (the "License");
13   *  you may not use this file except in compliance with the License.
14   *  You may obtain a copy of the License at
15   *
16   *      http://www.apache.org/licenses/LICENSE-2.0
17   *
18   *  Unless required by applicable law or agreed to in writing, software
19   *  distributed under the License is distributed on an "AS IS" BASIS,
20   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21   *  See the License for the specific language governing permissions and
22   *  limitations under the License.
23   * ====================================================================
24   *
25   * This file is part of the de.netseeker.ejoe.io framework.
26   * For more information on the author, please see
27   * <http://www.manskes.de/>.
28   *
29   *********************************************************************/
30  
31  package de.netseeker.ejoe.io;
32  
33  import java.io.IOException;
34  import java.net.InetAddress;
35  import java.nio.ByteBuffer;
36  import java.nio.channels.SelectionKey;
37  import java.nio.channels.SocketChannel;
38  import java.text.ParseException;
39  import java.util.logging.Level;
40  import java.util.logging.Logger;
41  
42  import de.netseeker.ejoe.ConnectionHeader;
43  import de.netseeker.ejoe.EJConstants;
44  import de.netseeker.ejoe.cache.ByteBufferAllocator;
45  
46  /***
47   * @author netseeker
48   * @since 0.3.9.1
49   */
50  class DefaultChannel extends DataChannel
51  {
52      private static final Logger   logger      = Logger.getLogger( DefaultChannel.class.getName() );
53  
54      private static DefaultChannel dataChannel = new DefaultChannel();
55  
56      private DefaultChannel()
57      {
58          super();
59      }
60  
61      /***
62       * @return
63       */
64      public static DataChannel getInstance()
65      {
66          return dataChannel;
67      }
68  
69      /***
70       * @param header
71       * @param sendBeforeReceive
72       * @param timeout
73       * @return
74       * @throws IOException
75       */
76      public ConnectionHeader handshake( final ConnectionHeader header, SocketChannel channel, long timeout )
77              throws IOException, ParseException
78      {
79          ConnectionHeader receiverHeader;
80          ByteBuffer magicBuf = null;
81  
82          try
83          {
84              // shall we act as clientside and initialize the handshake?
85              if ( header.isClient() )
86              {
87                  // ok, get the control bits as well as the adapter name in a
88                  // ByteChannel
89                  ByteBuffer tmpBuf = header.toByteBuffer();
90                  magicBuf = ByteBufferAllocator.allocate( tmpBuf.remaining() + 4, false );
91                  magicBuf.putInt( EJConstants.EJOE_MAGIC_NUMBER );
92                  magicBuf.put( tmpBuf );
93                  magicBuf.flip();
94                  logger.log( Level.FINEST, "Sending Clientheader: " + header );
95                  semiBlockingWrite( channel, magicBuf, timeout );
96                  if ( magicBuf.hasRemaining() ) return null;
97                  // the server will just answer with the header byte
98                  magicBuf.clear();
99                  magicBuf.limit( 1 );
100             }
101             else
102             {
103                 // expect to read the header byte and an integer (the size of
104                 // the forthcoming adapter name string)
105                 // the magic integer was already read in #handshake
106                 magicBuf = ByteBufferAllocator.allocate( 5, false );
107             }
108 
109             // Server: read the client header byte + sizeof adapter name
110             // Client: read just the server header byte
111             semiBlockingRead( channel, magicBuf, timeout );
112             if ( magicBuf.hasRemaining() )
113             {
114                 throw new ParseException( "Header too long!", magicBuf.position() );
115             }
116 
117             magicBuf.flip();
118             InetAddress address = channel.socket().getInetAddress();
119             int port = channel.socket().getPort();
120             receiverHeader = new ConnectionHeader( channel, address.getHostAddress() + ':' + port, header.isClient(),
121                                                    magicBuf.get() );
122 
123             // shall we act as server side and answer to the handshake request?
124             if ( !header.isClient() )
125             {
126                 if ( receiverHeader.hasAdapter() )
127                 {
128                     // ok, we have already read the client header byte, now read
129                     // the adapter name string
130                     if ( readAdapterName( receiverHeader, magicBuf, timeout ) == -1 ) return null;
131                 }
132 
133                 // answer to the client request and send our serverside
134                 // headerbyte
135                 magicBuf.clear();
136                 magicBuf.limit( 1 );
137                 magicBuf.put( header.toByte() );
138                 magicBuf.flip();
139                 semiBlockingWrite( channel, magicBuf, timeout );
140                 if ( magicBuf.hasRemaining() ) return null;
141             }
142 
143             receiverHeader.setConnected( true );
144         }
145         finally
146         {
147             ByteBufferAllocator.collect( magicBuf );
148         }
149 
150         return receiverHeader;
151     }
152 
153     /***
154      * @param receiverHeader
155      * @param magicBuf
156      * @param timeout
157      * @return
158      * @throws IOException
159      */
160     private int readAdapterName( ConnectionHeader receiverHeader, ByteBuffer magicBuf, long timeout )
161             throws IOException
162     {
163         int length = magicBuf.getInt();
164         if ( length > 1024 )
165         {
166             throw new IOException( "Invalid length for adapter name detected. The request is not well formatted!" );
167         }
168 
169         // client requested a special adapter?
170         if ( length > 0 )
171         {
172             // limit the existing ByteBuffer if it's big enough
173             if ( length <= magicBuf.capacity() )
174             {
175                 magicBuf.clear();
176                 magicBuf.limit( length );
177             }
178             // otherwise allocate a new one
179             else
180             {
181                 magicBuf = ByteBufferAllocator.allocate( length );
182             }
183 
184             // read the adapter name
185             semiBlockingRead( receiverHeader.getChannel(), magicBuf, timeout );
186             if ( magicBuf.hasRemaining() ) return -1;
187             magicBuf.flip();
188 
189             byte[] adapterArr = new byte[length];
190             magicBuf.get( adapterArr );
191             receiverHeader.setAdapterName( new String( adapterArr, EJConstants.EJOE_DEFAULT_CHARSET ) );
192         }
193 
194         return length;
195     }
196 
197     /*
198      * (non-Javadoc)
199      * 
200      * @see de.netseeker.ejoe.io.DataChannel#readHeader(long)
201      */
202     public int readHeader( ConnectionHeader header, long timeout ) throws IOException
203     {
204         ByteBuffer headerBuf = ByteBufferAllocator.allocate( EJConstants.NIO_HEADER_SIZE, false );
205         int read = 0;
206         SocketChannel channel = header.getChannel();
207 
208         try
209         {
210             nonBlockingRead( channel, headerBuf );
211             headerBuf.flip();
212             read = headerBuf.getInt();
213             IOUtil.setReceiveBufferSize( channel.socket(), read );
214         }
215         catch ( IncompleteIOException ioe )
216         {
217             logger.log( Level.FINEST, "Incomplete header read detected, registering for read again." );
218             // ioe.setIOBuffer(null);
219             // ioe.setSelectionInterest(SelectionKey.OP_READ);
220             throw new IncompleteIOException( null, SelectionKey.OP_READ );
221         }
222         finally
223         {
224             ByteBufferAllocator.collect( headerBuf );
225         }
226 
227         return read;
228     }
229 
230     /*
231      * (non-Javadoc)
232      * 
233      * @see de.netseeker.ejoe.io.DataChannel#writeHeader(de.netseeker.ejoe.ConnectionHeader, java.nio.ByteBuffer, long)
234      */
235     public void writeHeader( ConnectionHeader header, ByteBuffer buffer, long timeout ) throws IOException
236     {
237         ByteBuffer headerBuf = ByteBufferAllocator.allocate( EJConstants.NIO_HEADER_SIZE, false );
238         SocketChannel channel = header.getChannel();
239 
240         int length = buffer != null ? buffer.remaining() : 0;
241         headerBuf.putInt( length );
242         headerBuf.flip();
243         try
244         {
245             nonBlockingWrite( channel, headerBuf );
246             IOUtil.setSendBufferSize( channel.socket(), length );
247         }
248         catch ( IncompleteIOException ioe )
249         {
250             logger.log( Level.FINEST, "Incomplete header write detected, registering for write again." );
251             // ioe.setIOBuffer(null);
252             throw new IncompleteIOException( null, SelectionKey.OP_WRITE );
253         }
254         finally
255         {
256             ByteBufferAllocator.collect( headerBuf );
257         }
258     }
259 }