View Javadoc

1   /**********************************************************************
2    * IOUtils.java
3    * created on 15.08.2004 by netseeker
4    * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/io/IOUtils.java,v $
5    * $Date: 2006/02/04 14:15:53 $
6    * $Revision: 1.32 $
7    *********************************************************************/
8   
9   package de.netseeker.ejoe.io;
10  
11  import java.io.BufferedInputStream;
12  import java.io.BufferedOutputStream;
13  import java.io.IOException;
14  import java.io.InputStream;
15  import java.io.OutputStream;
16  import java.io.Reader;
17  import java.io.Writer;
18  import java.net.Socket;
19  import java.net.SocketException;
20  import java.nio.ByteBuffer;
21  import java.nio.channels.Channel;
22  import java.nio.channels.ClosedChannelException;
23  import java.nio.channels.ReadableByteChannel;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.Selector;
26  import java.nio.channels.SocketChannel;
27  import java.nio.channels.WritableByteChannel;
28  import java.util.logging.Level;
29  import java.util.logging.Logger;
30  import java.util.zip.DeflaterOutputStream;
31  import java.util.zip.GZIPInputStream;
32  import java.util.zip.GZIPOutputStream;
33  
34  import de.netseeker.ejoe.ConnectionHeader;
35  import de.netseeker.ejoe.EJConstants;
36  import de.netseeker.ejoe.adapter.SerializeAdapter;
37  
38  /***
39   * Some useful methods for closing io streams, io readers, channels and Selector
40   * quitely, as well as methods for non-blocking, semi-blocking and full blocking
41   * io read/write operations.
42   *
43   * @author netseeker aka Michael Manske
44   */
45  public final class IOUtils
46  {
47  	public static final int		NIO_MAX_ITERATIONS	= 3;
48  
49  	private static final Logger	logger				= Logger.getLogger(IOUtils.class.getName());
50  
51  	/***
52  	 * Tries to close an OutputStream and handles null values and IOExceptions
53  	 * quietly
54  	 *
55  	 * @param out
56  	 */
57  	public static void closeQuite(OutputStream out)
58  	{
59  		if (out != null)
60  		{
61  			try
62  			{
63  				out.close();
64  			}
65  			catch (IOException e)
66  			{
67  				// do nothing
68  			}
69  		}
70  	}
71  
72  	/***
73  	 * Tries to close a Writer and handles null values and IOExceptions quietly
74  	 *
75  	 * @param out
76  	 */
77  	public static void closeQuite(Writer out)
78  	{
79  		if (out != null)
80  		{
81  			try
82  			{
83  				out.close();
84  			}
85  			catch (IOException e)
86  			{
87  				// do nothing
88  			}
89  		}
90  	}
91  
92  	/***
93  	 * Tries to close an InputStream and handles null values and IOExceptions
94  	 * quietly
95  	 *
96  	 * @param in
97  	 */
98  	public static void closeQuite(InputStream in)
99  	{
100 		if (in != null)
101 		{
102 			try
103 			{
104 				in.close();
105 			}
106 			catch (IOException e)
107 			{
108 				// do nothing
109 			}
110 		}
111 	}
112 
113 	/***
114 	 * Tries to close a Reader and handles null values and IOExceptions quietly
115 	 *
116 	 * @param reader
117 	 */
118 	public static void closeQuite(Reader reader)
119 	{
120 		if (reader != null)
121 		{
122 			try
123 			{
124 				reader.close();
125 			}
126 			catch (IOException e)
127 			{
128 				// do nothing
129 			}
130 		}
131 	}
132 
133 	/***
134 	 * Tries to close an NIO Channel and handles null values and IOExceptions
135 	 * quietly
136 	 *
137 	 * @param channel
138 	 */
139 	public static void closeQuite(Channel channel)
140 	{
141 		if (channel != null)
142 		{
143 			try
144 			{
145 				channel.close();
146 			}
147 			catch (IOException e)
148 			{
149 				// do nothing
150 			}
151 		}
152 	}
153 
154 	/***
155 	 * Tries to close a NIO Selector and handles null values and IOExceptions
156 	 * quietly
157 	 *
158 	 * @param selector
159 	 */
160 	public static void closeQuite(Selector selector)
161 	{
162 		if (selector != null && selector.isOpen())
163 		{
164 			try
165 			{
166 				selector.close();
167 			}
168 			catch (IOException e)
169 			{
170 				// do nothing
171 			}
172 		}
173 	}
174 
175 	/***
176 	 * Handshake for a socket channel. It's used as workaround for a know issue
177 	 * with java sockets: Sometimes only the first byte will get transferred
178 	 * through a socket connection when reading from it first time. The other
179 	 * bytes will follow not until the next read. This method sends/receives one
180 	 * Byte through the socket to "initialize" the socket channel. So all
181 	 * following read/write operations don't have to handle that "1-Byte issue".
182 	 * The send/received Byte is used also as connection header, it contains
183 	 * information about compression, nio usage, if the connection is a
184 	 * persistent or non-persistent one...
185 	 *
186 	 * @param channel
187 	 *            A connected, readable and writeable socket channel
188 	 * @param sendBeforeReceive
189 	 *            if true we will try to send one byte then read one byte
190 	 *            otherwise we will use the opposite way around.
191 	 * @throws IOException
192 	 */
193 	public static ConnectionHeader handshake(SocketChannel channel, final ConnectionHeader header,
194 			boolean sendBeforeReceive, long timeout) throws IOException
195 	{
196 		ConnectionHeader receiverHeader;
197 		ByteBuffer magicBuf = null;
198 
199 		try
200 		{
201 			// shall we act as clientside and initialize the handshake?
202 			if (sendBeforeReceive)
203 			{
204 				// ok, get the control bits as well as the adapter name in a
205 				// ByteChannel
206 				magicBuf = header.toByteBuffer();
207 				logger.log(Level.FINEST, "Sending Clientheader: " + header);
208 				semiBlockingWrite(channel, magicBuf, timeout);
209 				if (magicBuf.hasRemaining()) return null;
210 				// the server will just answer with the header byte
211 				magicBuf.clear();
212 				magicBuf.limit(1);
213 			}
214 			else
215 			{
216 				// expect to read the header byte and an integer (the size of
217 				// the forthcoming adapter name string)
218 				magicBuf = ByteBuffer.allocateDirect(5);
219 			}
220 
221 			// Server: read the client header byte + sizeof adapter name
222 			// Client: read just the server header byte
223 			semiBlockingRead(channel, magicBuf, timeout);
224 			if (magicBuf.hasRemaining()) return null;
225 			magicBuf.flip();
226 			receiverHeader = new ConnectionHeader(channel, magicBuf.get());
227 
228 			// shall we act as server side and answer to the handshake request?
229 			if (!sendBeforeReceive)
230 			{
231 				// ok, we have already read the client header byte, now read the
232 				// size of the adapter name string
233 				int length = magicBuf.getInt();
234 				// client requested a special adapter?
235 				if (length > 0)
236 				{
237 					// limit the existing ByteBuffer if it's big enough
238 					if (length <= magicBuf.capacity())
239 					{
240 						magicBuf.clear();
241 						magicBuf.limit(length);
242 					}
243 					// otherwise allocate a new one
244 					else
245 					{
246 						magicBuf = ByteBuffer.allocateDirect(length);
247 					}
248 
249 					// read the adapter name
250 					semiBlockingRead(channel, magicBuf, timeout);
251 					if (magicBuf.hasRemaining()) return null;
252 					magicBuf.flip();
253 
254 					byte[] adapterArr = new byte[length];
255 					magicBuf.get(adapterArr);
256 					receiverHeader.setAdapterName(new String(adapterArr, "UTF8"));
257 				}
258 
259 				// answer to the client request and send our serverside header
260 				// byte
261 				magicBuf.limit(1);
262 				magicBuf.clear();
263 				magicBuf.put(header.toByte());
264 				magicBuf.flip();
265 				semiBlockingWrite(channel, magicBuf, timeout);
266 				if (magicBuf.hasRemaining()) return null;
267 			}
268 
269 			receiverHeader.setConnected(true);
270 		}
271 		finally
272 		{
273 			magicBuf = null;
274 		}
275 
276 		return receiverHeader;
277 	}
278 
279 	/***
280 	 * Invokes a SerializeAdapter for the given Object and handles compression
281 	 * and buffering
282 	 *
283 	 * @param adapter
284 	 * @param out
285 	 * @param obj
286 	 * @param buffered
287 	 * @param compressed
288 	 * @throws IOException
289 	 */
290 	public static void adapterSerialize(final SerializeAdapter adapter, OutputStream out, Object obj,
291 			boolean compressed, final int compressionLevel) throws IOException
292 	{
293 		logger.log(Level.FINEST, "Using compression: " + compressed);
294 
295 		if (compressed)
296 		{
297 			DeflaterOutputStream sOut = new GZIPOutputStream(out, EJConstants.BUFFERED_STREAM_SIZE)
298 				{
299 					{
300 						def.setLevel(compressionLevel);
301 					}
302 				};
303 			adapter.write(obj, sOut);
304 			sOut.finish();
305 		}
306 		else
307 		{
308 			BufferedOutputStream sOut = new BufferedOutputStream(out, EJConstants.BUFFERED_STREAM_SIZE);
309 			adapter.write(obj, sOut);
310 			sOut.flush();
311 		}
312 	}
313 
314 	/***
315 	 * Invokes a (De)SerializeAdapter for the given Object and handles
316 	 * decompression and buffering
317 	 *
318 	 * @param adapter
319 	 * @param in
320 	 * @param buffered
321 	 * @param compressed
322 	 * @return
323 	 * @throws IOException
324 	 */
325 	public static Object adapterDeserialize(final SerializeAdapter adapter, InputStream in, boolean compressed)
326 			throws IOException
327 	{
328 		InputStream sIn = null;
329 
330 		logger.log(Level.FINEST, "Using compression: " + compressed);
331 
332 		if (compressed)
333 		{
334 			sIn = new GZIPInputStream(in, EJConstants.BUFFERED_STREAM_SIZE);
335 		}
336 		else
337 		{
338 			sIn = new BufferedInputStream(in, EJConstants.BUFFERED_STREAM_SIZE);
339 		}
340 
341 		return adapter.read(sIn);
342 	}
343 
344 	/***
345 	 * Tries to send the given ByteBuffer completely through the given
346 	 * SocketChannel three times
347 	 *
348 	 * @param channel
349 	 * @param buffer
350 	 * @throws IncompleteIOException if the given ByteBuffer could not be send completely
351 	 * @throws IOException
352 	 */
353 	public static void nonBlockingWrite(WritableByteChannel channel, ByteBuffer buffer) throws IOException
354 	{
355 		int runs = 0;
356 		//setSendBufferSize(channel.socket(), buffer.remaining());
357 
358 		do
359 		{
360 			channel.write(buffer);
361 			runs++;
362 		}
363 		while (buffer.hasRemaining() && runs < EJConstants.NIO_MAX_ITERATIONS);
364 
365 		if (buffer.hasRemaining())
366 		{
367 			logger.log(Level.FINEST, "Incomplete write detected, registering for write again.");
368 			throw new IncompleteIOException(buffer, SelectionKey.OP_WRITE);
369 		}
370 	}
371 
372 	/***
373 	 * Tries to send the given ByteBuffer completely through the given
374 	 * SocketChannel within a given timeout
375 	 *
376 	 * @param channel
377 	 * @param buffer
378 	 * @param timeout
379 	 * @throws IncompleteIOException if the given ByteBuffer could not be send completely
380 	 * @throws IOException
381 	 */
382 	public static void semiBlockingWrite(WritableByteChannel channel, ByteBuffer buffer, long timeout)
383 			throws IOException
384 	{
385 		long timestamp = System.currentTimeMillis();
386 		//setSendBufferSize(channel.socket(), buffer.remaining());
387 
388 		do
389 		{
390 			channel.write(buffer);
391 		}
392 		while (buffer.hasRemaining() && ((System.currentTimeMillis() - timestamp) < timeout));
393 	}
394 
395 	/***
396 	 * Tries to send the given ByteBuffer completely through the given
397 	 * SocketChannel within a given timeout
398 	 *
399 	 * @param channel
400 	 * @param buffer
401 	 * @return
402 	 * @throws IOException
403 	 */
404 	public static ByteBuffer nonBlockingRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException
405 	{
406 		int read = 0, runs = 0;
407 
408 		try
409 		{
410 			//setReceiveBufferSize(channel.socket(), buffer.remaining());
411 
412 			do
413 			{
414 				read = channel.read(buffer);
415 				runs++;
416 			}
417 			while (buffer.hasRemaining() && runs < EJConstants.NIO_MAX_ITERATIONS && read != -1);
418 		}
419 		catch (IOException e)
420 		{
421 			// most likely the sender did close the connection
422 			throw new ClosedChannelException();
423 		}
424 
425 		if (buffer.hasRemaining())
426 		{
427 			if (read == -1)
428 			{
429 				throw new ClosedChannelException();
430 			}
431 			else
432 			{
433 				logger.log(Level.FINEST, "Incomplete read detected, registering for read again.");
434 				throw new IncompleteIOException(buffer, SelectionKey.OP_READ);
435 			}
436 		}
437 
438 		return buffer;
439 	}
440 
441 	/***
442 	 * Tries to read ByteBuffer.remaining() bytes the into given ByteBuffer from
443 	 * the given SocketChannel within a given timeout
444 	 *
445 	 * @param channel
446 	 * @param buffer
447 	 * @param timeout
448 	 * @return
449 	 * @throws IOException
450 	 */
451 	public static ByteBuffer semiBlockingRead(ReadableByteChannel channel, ByteBuffer buffer, long timeout)
452 			throws IOException
453 	{
454 		long timestamp = System.currentTimeMillis();
455 		int read = 0;
456 
457 		try
458 		{
459 			//setReceiveBufferSize(channel.socket(), buffer.remaining());
460 
461 			do
462 			{
463 				read = channel.read(buffer);
464 			}
465 			while (read != -1 && buffer.hasRemaining() && ((System.currentTimeMillis() - timestamp) < timeout));
466 		}
467 		catch (IOException e)
468 		{
469 			// most likely the sender did close the connection
470 			throw new ClosedChannelException();
471 		}
472 
473 		if (read == -1 && buffer.hasRemaining())
474 		{
475 			//either the client is too slow for us or the connection was dropped
476 			throw new ClosedChannelException();
477 		}
478 
479 		return buffer;
480 	}
481 
482 	/***
483 	 * Receives a EJOE specific header containing the size of the next
484 	 * ByteBuffer.
485 	 *
486 	 * @param channel
487 	 * @return the length of the following data package
488 	 * @throws IOException
489 	 */
490 	public static int readHeader(ConnectionHeader header) throws IOException
491 	{
492 		ByteBuffer headerBuf = ByteBuffer.allocate(EJConstants.NIO_HEADER_SIZE);
493 		int read = 0;
494 		SocketChannel channel = (SocketChannel) header.getChannel();
495 
496 		try
497 		{
498 			nonBlockingRead(channel, headerBuf);
499 			headerBuf.flip();
500 			read = headerBuf.getInt();
501 			setReceiveBufferSize(channel.socket(), read);
502 		}
503 		catch (IncompleteIOException ioe)
504 		{
505 			logger.log(Level.FINEST, "Incomplete header read detected, registering for read again.");
506 			//ioe.setIOBuffer(null);
507 			//ioe.setSelectionInterest(SelectionKey.OP_READ);
508 			throw new IncompleteIOException(null, SelectionKey.OP_READ);
509 		}
510 		finally
511 		{
512 			headerBuf = null;
513 		}
514 
515 		return read;
516 	}
517 
518 	/***
519 	 * Sends a EJOE specific header containing the lengh of the given ByteBuffer
520 	 *
521 	 * @param channel
522 	 * @param buffer
523 	 * @throws IOException
524 	 */
525 	public static void writeHeader(ConnectionHeader header, ByteBuffer buffer) throws IOException
526 	{
527 		ByteBuffer headerBuf = ByteBuffer.allocate(EJConstants.NIO_HEADER_SIZE);
528 		SocketChannel channel = (SocketChannel) header.getChannel();
529 
530 		int length = buffer != null ? buffer.remaining() : 0;
531 		headerBuf.putInt(length);
532 		headerBuf.flip();
533 		try
534 		{
535 			nonBlockingWrite(channel, headerBuf);
536 			setSendBufferSize(channel.socket(), length);
537 		}
538 		catch (IncompleteIOException ioe)
539 		{
540 			logger.log(Level.FINEST, "Incomplete header write detected, registering for write again.");
541 			//ioe.setIOBuffer(null);
542 			//ioe.setSelectionInterest(SelectionKey.OP_WRITE);
543 			throw new IncompleteIOException(null, SelectionKey.OP_WRITE);
544 		}
545 		finally
546 		{
547 			headerBuf = null;
548 		}
549 	}
550 
551 	/***
552 	 * Set the SO_SNDBUF hint on a connected socket to the size of the data which
553 	 * are expected to be written next time
554 	 * @param socket the connected socket
555 	 * @param size size to set for SO_SNDBUF
556 	 * @throws SocketException
557 	 */
558 	public static void setSendBufferSize(Socket socket, int size) throws SocketException
559 	{
560 		if (size > 0)
561 		{
562 			if (size <= EJConstants.EJOE_MAX_SOCKET_BUF_SIZE)
563 			{
564 				socket.setSendBufferSize(size);
565 			}
566 			else
567 			{
568 				socket.setSendBufferSize(EJConstants.EJOE_MAX_SOCKET_BUF_SIZE);
569 			}
570 		}
571 	}
572 
573 	/***
574 	 * Set the SO_RCVBUF hint on a connected socket to the size of the data which
575 	 * are expected to be read next time
576 	 * @param socket the connected socket
577 	 * @param size size to set for SO_RCVBUF
578 	 * @throws SocketException
579 	 */
580 	public static void setReceiveBufferSize(Socket socket, int size) throws SocketException
581 	{
582 		if (size > 0)
583 		{
584 			if (size <= EJConstants.EJOE_MAX_SOCKET_BUF_SIZE)
585 			{
586 				socket.setReceiveBufferSize(size);
587 			}
588 			else
589 			{
590 				socket.setReceiveBufferSize(EJConstants.EJOE_MAX_SOCKET_BUF_SIZE);
591 			}
592 		}
593 	}
594 }