View Javadoc

1   /**********************************************************************
2    * ConnectionWriter.java
3    * created on 05.03.2005 by netseeker
4    * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/core/ConnectionWriter.java,v $
5    * $Date: 2007/03/25 15:03:20 $
6    * $Revision: 1.4 $
7    *
8    * ====================================================================
9    *
10   *  Copyright 2005-2006 netseeker aka Michael Manske
11   *
12   *  Licensed under the Apache License, Version 2.0 (the "License");
13   *  you may not use this file except in compliance with the License.
14   *  You may obtain a copy of the License at
15   *
16   *      http://www.apache.org/licenses/LICENSE-2.0
17   *
18   *  Unless required by applicable law or agreed to in writing, software
19   *  distributed under the License is distributed on an "AS IS" BASIS,
20   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21   *  See the License for the specific language governing permissions and
22   *  limitations under the License.
23   * ====================================================================
24   *
25   * This file is part of the ejoe framework.
26   * For more information on the author, please see
27   * <http://www.manskes.de/>.
28   *
29   *********************************************************************/
30  package de.netseeker.ejoe.core;
31  
32  import java.io.IOException;
33  import java.io.OutputStream;
34  import java.net.SocketTimeoutException;
35  import java.nio.ByteBuffer;
36  import java.nio.channels.Channels;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.SelectionKey;
39  import java.nio.channels.WritableByteChannel;
40  import java.rmi.RemoteException;
41  import java.util.logging.Level;
42  import java.util.logging.Logger;
43  
44  import de.netseeker.ejoe.ConnectionHeader;
45  import de.netseeker.ejoe.EJConstants;
46  import de.netseeker.ejoe.ServerInfo;
47  import de.netseeker.ejoe.adapter.AdapterFactory;
48  import de.netseeker.ejoe.adapter.SerializeAdapter;
49  import de.netseeker.ejoe.cache.ByteBufferAllocator;
50  import de.netseeker.ejoe.io.ByteBufferOutputStream;
51  import de.netseeker.ejoe.io.DataChannel;
52  import de.netseeker.ejoe.io.IOUtil;
53  import de.netseeker.ejoe.io.IncompleteIOException;
54  
55  /***
56   * ConnectionWriter serializes a server answer and sends it through the established connection.
57   * 
58   * @author netseeker
59   * @since 0.3.1
60   */
61  final class ConnectionWriter implements Runnable
62  {
63      private static final Logger    log = Logger.getLogger( ConnectionReader.class.getName() );
64  
65      private final ChannelRegistrar _registrar;
66  
67      private final ConnectionHeader _senderInfo;
68  
69      private ConnectionHeader       _receiverInfo;
70  
71      /***
72       * Creates a new instance of ConnectionWriter
73       * 
74       * @param serverInfo
75       * @param clientInfo
76       * @param channel
77       * @param adapter
78       * @param attachment
79       */
80      public ConnectionWriter(final ChannelRegistrar registrar, final ServerInfo senderInfo, ConnectionHeader receiverInfo)
81      {
82          this._senderInfo = senderInfo;
83          this._receiverInfo = receiverInfo;
84          this._registrar = registrar;
85      }
86  
87      /*
88       * (non-Javadoc)
89       * 
90       * @see java.lang.Runnable#run()
91       */
92      public void run()
93      {
94          try
95          {
96              if ( this._senderInfo.hasNonBlockingReadWrite() )
97              {
98                  write();
99              }
100             else
101             {
102                 writeBlocked();
103             }
104 
105             this._receiverInfo.releaseWaitingBuffer();
106             this._receiverInfo.releaseAttachment();
107 
108             if ( !this._senderInfo.isPersistent() || !this._receiverInfo.isPersistent() )
109             {
110                 log.log( Level.FINEST, "Non-persistent connection detected, closing connection..." );
111                 shutdownConnection();
112             }
113             else if ( this._registrar.isValid() )
114             {
115                 log.log( Level.FINEST,
116                          "Persistent connection detected, registering connections for further read events..." );
117                 this._registrar.register( this._receiverInfo, SelectionKey.OP_READ );
118             }
119         }
120         catch ( ClosedChannelException cce )
121         {
122             log.log( Level.INFO, "Channel closed by client.", cce );
123             shutdownConnection();
124         }
125         catch ( IncompleteIOException ioe )
126         {
127             if ( this._registrar.isValid() )
128             {
129                 this._receiverInfo.setWaitingBuffer( ioe.getIOBuffer() );
130                 this._registrar.register( this._receiverInfo, SelectionKey.OP_WRITE );
131             }
132         }
133         catch ( SocketTimeoutException ste )
134         {
135             log.log( Level.FINE, "Timeout occured while sending data!", ste );
136             shutdownConnection();
137         }
138         catch ( RemoteException re )
139         {
140             this._receiverInfo.releaseWaitingBuffer();
141             this._receiverInfo.setAttachment( re );
142             this._registrar.register( this._receiverInfo, SelectionKey.OP_WRITE );
143         }
144         catch ( IOException e )
145         {
146             log.log( Level.WARNING,
147                      "!!! IOException while sending data to client propably the client just closed the connection!!!",
148                      e );
149             shutdownConnection();
150         }
151         catch ( Throwable e )
152         {
153             // something goes completely wrong!
154             log.log( Level.SEVERE, "!!! Unknown exception while sending data !!!", e );
155             shutdownConnection();
156         }
157     }
158 
159     private void shutdownConnection()
160     {
161         IOUtil.closeQuiet( this._receiverInfo.getChannel() );
162         this._receiverInfo = null;
163     }
164 
165     /***
166      * @throws IOException
167      */
168     private void write() throws IOException
169     {
170         ByteBufferOutputStream out = null;
171         ByteBuffer dataBuf = null;
172         SerializeAdapter adapter = this._receiverInfo.getAdapterName() != null ? AdapterFactory
173                 .createAdapter( this._receiverInfo.getAdapterName() ) : null;
174         WritableByteChannel channel = this._receiverInfo.getChannel();
175         DataChannel dataChannel = DataChannel.getInstance( this._receiverInfo );
176 
177         try
178         {
179             if ( this._receiverInfo.hasAttachment() )
180             {
181                 // usual way: convert the serialized object to a ByteBuffer
182                 if ( !this._receiverInfo.isDirect() )
183                 {
184                     out = new ByteBufferOutputStream();
185                     serialize( adapter, out );
186                     dataBuf = out.getBackingBuffer();
187                 }
188                 // direct mode: just use the attachement
189                 else
190                 {
191                     ByteBuffer tmp = (ByteBuffer) this._receiverInfo.getAttachment();
192                     if ( tmp.position() > 0 )
193                     {
194                         tmp.flip();
195                     }
196                     dataBuf = tmp.duplicate();
197                     ByteBufferAllocator.collect( tmp );
198                 }
199 
200                 if ( dataBuf.position() > 0 )
201                 {
202                     dataBuf.flip();
203                 }
204 
205                 this._receiverInfo.releaseAttachment();
206 
207                 dataChannel.writeHeader( this._receiverInfo, dataBuf, EJConstants.EJOE_CONNECTION_TIMEOUT );
208 
209                 if ( log.isLoggable( Level.FINE ) )
210                 {
211                     byte[] tmp = new byte[dataBuf.remaining()];
212                     dataBuf.get( tmp );
213                     dataBuf.position( 0 );
214                     log.log( Level.FINE, "Going to send server response:\n"
215                             + new String( tmp, EJConstants.EJOE_DEFAULT_CHARSET ) );
216                 }
217 
218                 dataChannel.nonBlockingWrite( channel, dataBuf );
219             }
220             else if ( this._receiverInfo.hasWaitingBuffer() )
221             {
222                 dataBuf = this._receiverInfo.getWaitingBuffer();
223                 dataChannel.nonBlockingWrite( channel, dataBuf );
224             }
225             // seems that we have no answer for the client
226             else
227             {
228                 dataChannel.writeHeader( this._receiverInfo, null, EJConstants.EJOE_CONNECTION_TIMEOUT );
229             }
230 
231             log.log( Level.FINE, "Server response sent." );
232         }
233         finally
234         {
235             IOUtil.closeQuiet( out );
236         }
237     }
238 
239     /***
240      * @throws IOException
241      */
242     private void writeBlocked() throws IOException
243     {
244         SerializeAdapter adapter = this._receiverInfo.getAdapterName() != null ? AdapterFactory
245                 .createAdapter( this._receiverInfo.getAdapterName() ) : null;
246 
247         log.log( Level.FINE, "Going to send server response... " );
248         if ( !this._receiverInfo.isDirect() )
249         {
250             serialize( adapter, Channels.newOutputStream( this._receiverInfo.getChannel() ) );
251         }
252         // direct mode: just use the attachement
253         else
254         {
255             ByteBuffer tmp = (ByteBuffer) this._receiverInfo.getAttachment();
256             if ( tmp.position() > 0 )
257             {
258                 tmp.flip();
259             }
260             IOUtil.writeDirect( Channels.newOutputStream( this._receiverInfo.getChannel() ), tmp );
261         }
262 
263         log.log( Level.FINE, "Server response sent." );
264     }
265 
266     /***
267      * @param out
268      * @throws IOException
269      */
270     private void serialize( SerializeAdapter adapter, OutputStream out ) throws IOException
271     {
272         boolean compressed = this._senderInfo.hasCompression() && this._receiverInfo.hasCompression();
273         try
274         {
275             IOUtil.adapterSerialize( adapter, out, this._receiverInfo.getAttachment(), compressed, this._senderInfo
276                     .getCompressionLevel() );
277         }
278         catch ( IOException ioe )
279         {
280             throw ioe;
281         }
282         catch ( Throwable t )
283         {
284             throw new RemoteException( "The server encounteres an error while serializing the server response!", t );
285         }
286     }
287 }