1 /**********************************************************************
2 * IOUtils.java
3 * created on 15.08.2004 by netseeker
4 * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/io/IOUtils.java,v $
5 * $Date: 2006/02/04 14:15:53 $
6 * $Revision: 1.32 $
7 *********************************************************************/
8
9 package de.netseeker.ejoe.io;
10
11 import java.io.BufferedInputStream;
12 import java.io.BufferedOutputStream;
13 import java.io.IOException;
14 import java.io.InputStream;
15 import java.io.OutputStream;
16 import java.io.Reader;
17 import java.io.Writer;
18 import java.net.Socket;
19 import java.net.SocketException;
20 import java.nio.ByteBuffer;
21 import java.nio.channels.Channel;
22 import java.nio.channels.ClosedChannelException;
23 import java.nio.channels.ReadableByteChannel;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.nio.channels.SocketChannel;
27 import java.nio.channels.WritableByteChannel;
28 import java.util.logging.Level;
29 import java.util.logging.Logger;
30 import java.util.zip.DeflaterOutputStream;
31 import java.util.zip.GZIPInputStream;
32 import java.util.zip.GZIPOutputStream;
33
34 import de.netseeker.ejoe.ConnectionHeader;
35 import de.netseeker.ejoe.EJConstants;
36 import de.netseeker.ejoe.adapter.SerializeAdapter;
37
38 /***
39 * Some useful methods for closing io streams, io readers, channels and Selector
40 * quitely, as well as methods for non-blocking, semi-blocking and full blocking
41 * io read/write operations.
42 *
43 * @author netseeker aka Michael Manske
44 */
45 public final class IOUtils
46 {
47 public static final int NIO_MAX_ITERATIONS = 3;
48
49 private static final Logger logger = Logger.getLogger(IOUtils.class.getName());
50
51 /***
52 * Tries to close an OutputStream and handles null values and IOExceptions
53 * quietly
54 *
55 * @param out
56 */
57 public static void closeQuite(OutputStream out)
58 {
59 if (out != null)
60 {
61 try
62 {
63 out.close();
64 }
65 catch (IOException e)
66 {
67
68 }
69 }
70 }
71
72 /***
73 * Tries to close a Writer and handles null values and IOExceptions quietly
74 *
75 * @param out
76 */
77 public static void closeQuite(Writer out)
78 {
79 if (out != null)
80 {
81 try
82 {
83 out.close();
84 }
85 catch (IOException e)
86 {
87
88 }
89 }
90 }
91
92 /***
93 * Tries to close an InputStream and handles null values and IOExceptions
94 * quietly
95 *
96 * @param in
97 */
98 public static void closeQuite(InputStream in)
99 {
100 if (in != null)
101 {
102 try
103 {
104 in.close();
105 }
106 catch (IOException e)
107 {
108
109 }
110 }
111 }
112
113 /***
114 * Tries to close a Reader and handles null values and IOExceptions quietly
115 *
116 * @param reader
117 */
118 public static void closeQuite(Reader reader)
119 {
120 if (reader != null)
121 {
122 try
123 {
124 reader.close();
125 }
126 catch (IOException e)
127 {
128
129 }
130 }
131 }
132
133 /***
134 * Tries to close an NIO Channel and handles null values and IOExceptions
135 * quietly
136 *
137 * @param channel
138 */
139 public static void closeQuite(Channel channel)
140 {
141 if (channel != null)
142 {
143 try
144 {
145 channel.close();
146 }
147 catch (IOException e)
148 {
149
150 }
151 }
152 }
153
154 /***
155 * Tries to close a NIO Selector and handles null values and IOExceptions
156 * quietly
157 *
158 * @param selector
159 */
160 public static void closeQuite(Selector selector)
161 {
162 if (selector != null && selector.isOpen())
163 {
164 try
165 {
166 selector.close();
167 }
168 catch (IOException e)
169 {
170
171 }
172 }
173 }
174
175 /***
176 * Handshake for a socket channel. It's used as workaround for a know issue
177 * with java sockets: Sometimes only the first byte will get transferred
178 * through a socket connection when reading from it first time. The other
179 * bytes will follow not until the next read. This method sends/receives one
180 * Byte through the socket to "initialize" the socket channel. So all
181 * following read/write operations don't have to handle that "1-Byte issue".
182 * The send/received Byte is used also as connection header, it contains
183 * information about compression, nio usage, if the connection is a
184 * persistent or non-persistent one...
185 *
186 * @param channel
187 * A connected, readable and writeable socket channel
188 * @param sendBeforeReceive
189 * if true we will try to send one byte then read one byte
190 * otherwise we will use the opposite way around.
191 * @throws IOException
192 */
193 public static ConnectionHeader handshake(SocketChannel channel, final ConnectionHeader header,
194 boolean sendBeforeReceive, long timeout) throws IOException
195 {
196 ConnectionHeader receiverHeader;
197 ByteBuffer magicBuf = null;
198
199 try
200 {
201
202 if (sendBeforeReceive)
203 {
204
205
206 magicBuf = header.toByteBuffer();
207 logger.log(Level.FINEST, "Sending Clientheader: " + header);
208 semiBlockingWrite(channel, magicBuf, timeout);
209 if (magicBuf.hasRemaining()) return null;
210
211 magicBuf.clear();
212 magicBuf.limit(1);
213 }
214 else
215 {
216
217
218 magicBuf = ByteBuffer.allocateDirect(5);
219 }
220
221
222
223 semiBlockingRead(channel, magicBuf, timeout);
224 if (magicBuf.hasRemaining()) return null;
225 magicBuf.flip();
226 receiverHeader = new ConnectionHeader(channel, magicBuf.get());
227
228
229 if (!sendBeforeReceive)
230 {
231
232
233 int length = magicBuf.getInt();
234
235 if (length > 0)
236 {
237
238 if (length <= magicBuf.capacity())
239 {
240 magicBuf.clear();
241 magicBuf.limit(length);
242 }
243
244 else
245 {
246 magicBuf = ByteBuffer.allocateDirect(length);
247 }
248
249
250 semiBlockingRead(channel, magicBuf, timeout);
251 if (magicBuf.hasRemaining()) return null;
252 magicBuf.flip();
253
254 byte[] adapterArr = new byte[length];
255 magicBuf.get(adapterArr);
256 receiverHeader.setAdapterName(new String(adapterArr, "UTF8"));
257 }
258
259
260
261 magicBuf.limit(1);
262 magicBuf.clear();
263 magicBuf.put(header.toByte());
264 magicBuf.flip();
265 semiBlockingWrite(channel, magicBuf, timeout);
266 if (magicBuf.hasRemaining()) return null;
267 }
268
269 receiverHeader.setConnected(true);
270 }
271 finally
272 {
273 magicBuf = null;
274 }
275
276 return receiverHeader;
277 }
278
279 /***
280 * Invokes a SerializeAdapter for the given Object and handles compression
281 * and buffering
282 *
283 * @param adapter
284 * @param out
285 * @param obj
286 * @param buffered
287 * @param compressed
288 * @throws IOException
289 */
290 public static void adapterSerialize(final SerializeAdapter adapter, OutputStream out, Object obj,
291 boolean compressed, final int compressionLevel) throws IOException
292 {
293 logger.log(Level.FINEST, "Using compression: " + compressed);
294
295 if (compressed)
296 {
297 DeflaterOutputStream sOut = new GZIPOutputStream(out, EJConstants.BUFFERED_STREAM_SIZE)
298 {
299 {
300 def.setLevel(compressionLevel);
301 }
302 };
303 adapter.write(obj, sOut);
304 sOut.finish();
305 }
306 else
307 {
308 BufferedOutputStream sOut = new BufferedOutputStream(out, EJConstants.BUFFERED_STREAM_SIZE);
309 adapter.write(obj, sOut);
310 sOut.flush();
311 }
312 }
313
314 /***
315 * Invokes a (De)SerializeAdapter for the given Object and handles
316 * decompression and buffering
317 *
318 * @param adapter
319 * @param in
320 * @param buffered
321 * @param compressed
322 * @return
323 * @throws IOException
324 */
325 public static Object adapterDeserialize(final SerializeAdapter adapter, InputStream in, boolean compressed)
326 throws IOException
327 {
328 InputStream sIn = null;
329
330 logger.log(Level.FINEST, "Using compression: " + compressed);
331
332 if (compressed)
333 {
334 sIn = new GZIPInputStream(in, EJConstants.BUFFERED_STREAM_SIZE);
335 }
336 else
337 {
338 sIn = new BufferedInputStream(in, EJConstants.BUFFERED_STREAM_SIZE);
339 }
340
341 return adapter.read(sIn);
342 }
343
344 /***
345 * Tries to send the given ByteBuffer completely through the given
346 * SocketChannel three times
347 *
348 * @param channel
349 * @param buffer
350 * @throws IncompleteIOException if the given ByteBuffer could not be send completely
351 * @throws IOException
352 */
353 public static void nonBlockingWrite(WritableByteChannel channel, ByteBuffer buffer) throws IOException
354 {
355 int runs = 0;
356
357
358 do
359 {
360 channel.write(buffer);
361 runs++;
362 }
363 while (buffer.hasRemaining() && runs < EJConstants.NIO_MAX_ITERATIONS);
364
365 if (buffer.hasRemaining())
366 {
367 logger.log(Level.FINEST, "Incomplete write detected, registering for write again.");
368 throw new IncompleteIOException(buffer, SelectionKey.OP_WRITE);
369 }
370 }
371
372 /***
373 * Tries to send the given ByteBuffer completely through the given
374 * SocketChannel within a given timeout
375 *
376 * @param channel
377 * @param buffer
378 * @param timeout
379 * @throws IncompleteIOException if the given ByteBuffer could not be send completely
380 * @throws IOException
381 */
382 public static void semiBlockingWrite(WritableByteChannel channel, ByteBuffer buffer, long timeout)
383 throws IOException
384 {
385 long timestamp = System.currentTimeMillis();
386
387
388 do
389 {
390 channel.write(buffer);
391 }
392 while (buffer.hasRemaining() && ((System.currentTimeMillis() - timestamp) < timeout));
393 }
394
395 /***
396 * Tries to send the given ByteBuffer completely through the given
397 * SocketChannel within a given timeout
398 *
399 * @param channel
400 * @param buffer
401 * @return
402 * @throws IOException
403 */
404 public static ByteBuffer nonBlockingRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException
405 {
406 int read = 0, runs = 0;
407
408 try
409 {
410
411
412 do
413 {
414 read = channel.read(buffer);
415 runs++;
416 }
417 while (buffer.hasRemaining() && runs < EJConstants.NIO_MAX_ITERATIONS && read != -1);
418 }
419 catch (IOException e)
420 {
421
422 throw new ClosedChannelException();
423 }
424
425 if (buffer.hasRemaining())
426 {
427 if (read == -1)
428 {
429 throw new ClosedChannelException();
430 }
431 else
432 {
433 logger.log(Level.FINEST, "Incomplete read detected, registering for read again.");
434 throw new IncompleteIOException(buffer, SelectionKey.OP_READ);
435 }
436 }
437
438 return buffer;
439 }
440
441 /***
442 * Tries to read ByteBuffer.remaining() bytes the into given ByteBuffer from
443 * the given SocketChannel within a given timeout
444 *
445 * @param channel
446 * @param buffer
447 * @param timeout
448 * @return
449 * @throws IOException
450 */
451 public static ByteBuffer semiBlockingRead(ReadableByteChannel channel, ByteBuffer buffer, long timeout)
452 throws IOException
453 {
454 long timestamp = System.currentTimeMillis();
455 int read = 0;
456
457 try
458 {
459
460
461 do
462 {
463 read = channel.read(buffer);
464 }
465 while (read != -1 && buffer.hasRemaining() && ((System.currentTimeMillis() - timestamp) < timeout));
466 }
467 catch (IOException e)
468 {
469
470 throw new ClosedChannelException();
471 }
472
473 if (read == -1 && buffer.hasRemaining())
474 {
475
476 throw new ClosedChannelException();
477 }
478
479 return buffer;
480 }
481
482 /***
483 * Receives a EJOE specific header containing the size of the next
484 * ByteBuffer.
485 *
486 * @param channel
487 * @return the length of the following data package
488 * @throws IOException
489 */
490 public static int readHeader(ConnectionHeader header) throws IOException
491 {
492 ByteBuffer headerBuf = ByteBuffer.allocate(EJConstants.NIO_HEADER_SIZE);
493 int read = 0;
494 SocketChannel channel = (SocketChannel) header.getChannel();
495
496 try
497 {
498 nonBlockingRead(channel, headerBuf);
499 headerBuf.flip();
500 read = headerBuf.getInt();
501 setReceiveBufferSize(channel.socket(), read);
502 }
503 catch (IncompleteIOException ioe)
504 {
505 logger.log(Level.FINEST, "Incomplete header read detected, registering for read again.");
506
507
508 throw new IncompleteIOException(null, SelectionKey.OP_READ);
509 }
510 finally
511 {
512 headerBuf = null;
513 }
514
515 return read;
516 }
517
518 /***
519 * Sends a EJOE specific header containing the lengh of the given ByteBuffer
520 *
521 * @param channel
522 * @param buffer
523 * @throws IOException
524 */
525 public static void writeHeader(ConnectionHeader header, ByteBuffer buffer) throws IOException
526 {
527 ByteBuffer headerBuf = ByteBuffer.allocate(EJConstants.NIO_HEADER_SIZE);
528 SocketChannel channel = (SocketChannel) header.getChannel();
529
530 int length = buffer != null ? buffer.remaining() : 0;
531 headerBuf.putInt(length);
532 headerBuf.flip();
533 try
534 {
535 nonBlockingWrite(channel, headerBuf);
536 setSendBufferSize(channel.socket(), length);
537 }
538 catch (IncompleteIOException ioe)
539 {
540 logger.log(Level.FINEST, "Incomplete header write detected, registering for write again.");
541
542
543 throw new IncompleteIOException(null, SelectionKey.OP_WRITE);
544 }
545 finally
546 {
547 headerBuf = null;
548 }
549 }
550
551 /***
552 * Set the SO_SNDBUF hint on a connected socket to the size of the data which
553 * are expected to be written next time
554 * @param socket the connected socket
555 * @param size size to set for SO_SNDBUF
556 * @throws SocketException
557 */
558 public static void setSendBufferSize(Socket socket, int size) throws SocketException
559 {
560 if (size > 0)
561 {
562 if (size <= EJConstants.EJOE_MAX_SOCKET_BUF_SIZE)
563 {
564 socket.setSendBufferSize(size);
565 }
566 else
567 {
568 socket.setSendBufferSize(EJConstants.EJOE_MAX_SOCKET_BUF_SIZE);
569 }
570 }
571 }
572
573 /***
574 * Set the SO_RCVBUF hint on a connected socket to the size of the data which
575 * are expected to be read next time
576 * @param socket the connected socket
577 * @param size size to set for SO_RCVBUF
578 * @throws SocketException
579 */
580 public static void setReceiveBufferSize(Socket socket, int size) throws SocketException
581 {
582 if (size > 0)
583 {
584 if (size <= EJConstants.EJOE_MAX_SOCKET_BUF_SIZE)
585 {
586 socket.setReceiveBufferSize(size);
587 }
588 else
589 {
590 socket.setReceiveBufferSize(EJConstants.EJOE_MAX_SOCKET_BUF_SIZE);
591 }
592 }
593 }
594 }