1 /**********************************************************************
2 * DefaultChannel.java
3 * created on 01.04.2006 by netseeker
4 * $Source$
5 * $Date$
6 * $Revision$
7 *
8 * ====================================================================
9 *
10 * Copyright 2006 netseeker aka Michael Manske
11 *
12 * Licensed under the Apache License, Version 2.0 (the "License");
13 * you may not use this file except in compliance with the License.
14 * You may obtain a copy of the License at
15 *
16 * http://www.apache.org/licenses/LICENSE-2.0
17 *
18 * Unless required by applicable law or agreed to in writing, software
19 * distributed under the License is distributed on an "AS IS" BASIS,
20 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21 * See the License for the specific language governing permissions and
22 * limitations under the License.
23 * ====================================================================
24 *
25 * This file is part of the de.netseeker.ejoe.io framework.
26 * For more information on the author, please see
27 * <http://www.manskes.de/>.
28 *
29 *********************************************************************/
30
31 package de.netseeker.ejoe.io;
32
33 import java.io.IOException;
34 import java.net.InetAddress;
35 import java.nio.ByteBuffer;
36 import java.nio.channels.SelectionKey;
37 import java.nio.channels.SocketChannel;
38 import java.text.ParseException;
39 import java.util.logging.Level;
40 import java.util.logging.Logger;
41
42 import de.netseeker.ejoe.ConnectionHeader;
43 import de.netseeker.ejoe.EJConstants;
44 import de.netseeker.ejoe.cache.ByteBufferAllocator;
45
46 /***
47 * @author netseeker
48 * @since 0.3.9.1
49 */
50 class DefaultChannel extends DataChannel
51 {
52 private static final Logger logger = Logger.getLogger( DefaultChannel.class.getName() );
53
54 private static DefaultChannel dataChannel = new DefaultChannel();
55
56 private DefaultChannel()
57 {
58 super();
59 }
60
61 /***
62 * @return
63 */
64 public static DataChannel getInstance()
65 {
66 return dataChannel;
67 }
68
69 /***
70 * @param header
71 * @param sendBeforeReceive
72 * @param timeout
73 * @return
74 * @throws IOException
75 */
76 public ConnectionHeader handshake( final ConnectionHeader header, SocketChannel channel, long timeout )
77 throws IOException, ParseException
78 {
79 ConnectionHeader receiverHeader;
80 ByteBuffer magicBuf = null;
81
82 try
83 {
84
85 if ( header.isClient() )
86 {
87
88
89 ByteBuffer tmpBuf = header.toByteBuffer();
90 magicBuf = ByteBufferAllocator.allocate( tmpBuf.remaining() + 4, false );
91 magicBuf.putInt( EJConstants.EJOE_MAGIC_NUMBER );
92 magicBuf.put( tmpBuf );
93 magicBuf.flip();
94 logger.log( Level.FINEST, "Sending Clientheader: " + header );
95 semiBlockingWrite( channel, magicBuf, timeout );
96 if ( magicBuf.hasRemaining() ) return null;
97
98 magicBuf.clear();
99 magicBuf.limit( 1 );
100 }
101 else
102 {
103
104
105
106 magicBuf = ByteBufferAllocator.allocate( 5, false );
107 }
108
109
110
111 semiBlockingRead( channel, magicBuf, timeout );
112 if ( magicBuf.hasRemaining() )
113 {
114 throw new ParseException( "Header too long!", magicBuf.position() );
115 }
116
117 magicBuf.flip();
118 InetAddress address = channel.socket().getInetAddress();
119 int port = channel.socket().getPort();
120 receiverHeader = new ConnectionHeader( channel, address.getHostAddress() + ':' + port, header.isClient(),
121 magicBuf.get() );
122
123
124 if ( !header.isClient() )
125 {
126 if ( receiverHeader.hasAdapter() )
127 {
128
129
130 if ( readAdapterName( receiverHeader, magicBuf, timeout ) == -1 ) return null;
131 }
132
133
134
135 magicBuf.clear();
136 magicBuf.limit( 1 );
137 magicBuf.put( header.toByte() );
138 magicBuf.flip();
139 semiBlockingWrite( channel, magicBuf, timeout );
140 if ( magicBuf.hasRemaining() ) return null;
141 }
142
143 receiverHeader.setConnected( true );
144 }
145 finally
146 {
147 ByteBufferAllocator.collect( magicBuf );
148 }
149
150 return receiverHeader;
151 }
152
153 /***
154 * @param receiverHeader
155 * @param magicBuf
156 * @param timeout
157 * @return
158 * @throws IOException
159 */
160 private int readAdapterName( ConnectionHeader receiverHeader, ByteBuffer magicBuf, long timeout )
161 throws IOException
162 {
163 int length = magicBuf.getInt();
164 if ( length > 1024 )
165 {
166 throw new IOException( "Invalid length for adapter name detected. The request is not well formatted!" );
167 }
168
169
170 if ( length > 0 )
171 {
172
173 if ( length <= magicBuf.capacity() )
174 {
175 magicBuf.clear();
176 magicBuf.limit( length );
177 }
178
179 else
180 {
181 magicBuf = ByteBufferAllocator.allocate( length );
182 }
183
184
185 semiBlockingRead( receiverHeader.getChannel(), magicBuf, timeout );
186 if ( magicBuf.hasRemaining() ) return -1;
187 magicBuf.flip();
188
189 byte[] adapterArr = new byte[length];
190 magicBuf.get( adapterArr );
191 receiverHeader.setAdapterName( new String( adapterArr, EJConstants.EJOE_DEFAULT_CHARSET ) );
192 }
193
194 return length;
195 }
196
197
198
199
200
201
202 public int readHeader( ConnectionHeader header, long timeout ) throws IOException
203 {
204 ByteBuffer headerBuf = ByteBufferAllocator.allocate( EJConstants.NIO_HEADER_SIZE, false );
205 int read = 0;
206 SocketChannel channel = header.getChannel();
207
208 try
209 {
210 nonBlockingRead( channel, headerBuf );
211 headerBuf.flip();
212 read = headerBuf.getInt();
213 IOUtil.setReceiveBufferSize( channel.socket(), read );
214 }
215 catch ( IncompleteIOException ioe )
216 {
217 logger.log( Level.FINEST, "Incomplete header read detected, registering for read again." );
218
219
220 throw new IncompleteIOException( null, SelectionKey.OP_READ );
221 }
222 finally
223 {
224 ByteBufferAllocator.collect( headerBuf );
225 }
226
227 return read;
228 }
229
230
231
232
233
234
235 public void writeHeader( ConnectionHeader header, ByteBuffer buffer, long timeout ) throws IOException
236 {
237 ByteBuffer headerBuf = ByteBufferAllocator.allocate( EJConstants.NIO_HEADER_SIZE, false );
238 SocketChannel channel = header.getChannel();
239
240 int length = buffer != null ? buffer.remaining() : 0;
241 headerBuf.putInt( length );
242 headerBuf.flip();
243 try
244 {
245 nonBlockingWrite( channel, headerBuf );
246 IOUtil.setSendBufferSize( channel.socket(), length );
247 }
248 catch ( IncompleteIOException ioe )
249 {
250 logger.log( Level.FINEST, "Incomplete header write detected, registering for write again." );
251
252 throw new IncompleteIOException( null, SelectionKey.OP_WRITE );
253 }
254 finally
255 {
256 ByteBufferAllocator.collect( headerBuf );
257 }
258 }
259 }