View Javadoc

1   /**********************************************************************
2    * ConnectionWriter.java
3    * created on 05.03.2005 by netseeker
4    * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/ConnectionWriter.java,v $
5    * $Date: 2006/02/12 20:45:12 $
6    * $Revision: 1.7 $
7    *
8    * ====================================================================
9    *
10   *  Copyright 2005-2006 netseeker aka Michael Manske
11   *
12   *  Licensed under the Apache License, Version 2.0 (the "License");
13   *  you may not use this file except in compliance with the License.
14   *  You may obtain a copy of the License at
15   *
16   *      http://www.apache.org/licenses/LICENSE-2.0
17   *
18   *  Unless required by applicable law or agreed to in writing, software
19   *  distributed under the License is distributed on an "AS IS" BASIS,
20   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21   *  See the License for the specific language governing permissions and
22   *  limitations under the License.
23   * ====================================================================
24   *
25   * This file is part of the ejoe framework.
26   * For more information on the author, please see
27   * <http://www.manskes.de/>.
28   *
29   *********************************************************************/
30  package de.netseeker.ejoe;
31  
32  import java.io.IOException;
33  import java.io.OutputStream;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.Channels;
36  import java.nio.channels.ClosedChannelException;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.WritableByteChannel;
39  import java.util.logging.Level;
40  import java.util.logging.Logger;
41  
42  import de.netseeker.ejoe.adapter.AdapterFactory;
43  import de.netseeker.ejoe.adapter.SerializeAdapter;
44  import de.netseeker.ejoe.io.ByteBufferOutputStream;
45  import de.netseeker.ejoe.io.IOUtils;
46  import de.netseeker.ejoe.io.IncompleteIOException;
47  
48  /***
49   * ConnectionWriter serializes a server answer and sends it through the established connection.
50   * @author netseeker
51   * @since 0.3.1
52   */
53  final class ConnectionWriter implements Runnable
54  {
55  	private static final Logger		log	= Logger.getLogger(ConnectionReader.class.getName());
56  
57  	private final ChannelRegistrar	_registrar;
58  
59  	private final ConnectionHeader	_senderInfo;
60  
61  	private ConnectionHeader		_receiverInfo;
62  
63  	/***
64  	 * @param serverInfo
65  	 * @param clientInfo
66  	 * @param channel
67  	 * @param adapter
68  	 * @param attachment
69  	 */
70  	public ConnectionWriter(final ChannelRegistrar registrar, final ConnectionHeader senderInfo,
71  			ConnectionHeader receiverInfo)
72  	{
73  		this._registrar = registrar;
74  		this._senderInfo = senderInfo;
75  		this._receiverInfo = receiverInfo;
76  	}
77  
78  	/*
79  	 * (non-Javadoc)
80  	 *
81  	 * @see java.lang.Runnable#run()
82  	 */
83  	public void run()
84  	{
85  		try
86  		{
87  			SerializeAdapter adapter = AdapterFactory.createAdapter(this._receiverInfo.getAdapterName());
88  
89  			if (this._senderInfo.hasNonBlockingReadWrite())
90  			{
91  				write(adapter);
92  			}
93  			else
94  			{
95  				writeBlocked(adapter);
96  			}
97  
98  			if (!(this._senderInfo.isPersistent() && this._receiverInfo.isPersistent()))
99  			{
100 				log.log(Level.FINEST, "Non-persistent connection detected, closing connection...");
101 				IOUtils.closeQuite(this._receiverInfo.getChannel());
102 				this._receiverInfo = null;
103 			}
104 			else
105 			{
106 				log.log(Level.FINEST,
107 						"Persistent connection detected, registering connections for further read events...");
108 				this._receiverInfo.releaseWaitingBuffer();
109 				this._receiverInfo.releaseAttachment();
110 				this._registrar.register(this._receiverInfo, SelectionKey.OP_READ);
111 			}
112 		}
113 		catch (ClosedChannelException cce)
114 		{
115 			log.log(Level.INFO, "Channel closed by client.", cce);
116 			IOUtils.closeQuite(this._receiverInfo.getChannel());
117 			this._receiverInfo = null;
118 		}
119 		catch (IncompleteIOException ioe)
120 		{
121 			if (this._registrar.isValid())
122 			{
123 				this._receiverInfo.setWaitingBuffer(ioe.getIOBuffer());
124 				this._registrar.register(this._receiverInfo, SelectionKey.OP_WRITE);
125 			}
126 		}
127 		catch (IOException e)
128 		{
129 			log.log(Level.SEVERE, "!!! IOException while sending data to client !!!", e);
130 			IOUtils.closeQuite(this._receiverInfo.getChannel());
131 			this._receiverInfo = null;
132 		}
133 		catch (Exception e)
134 		{
135 			// something goes completely wrong!
136 			log.log(Level.SEVERE, "!!! Unknown exception while sending data !!!", e);
137 			IOUtils.closeQuite(this._receiverInfo.getChannel());
138 			this._receiverInfo = null;
139 		}
140 	}
141 
142 	/***
143 	 * @throws IOException
144 	 */
145 	private void write(SerializeAdapter adapter) throws IOException
146 	{
147 		ByteBufferOutputStream out = null;
148 		ByteBuffer dataBuf = null;
149 		WritableByteChannel channel = (WritableByteChannel) this._receiverInfo.getChannel();
150 
151 		try
152 		{
153 			if (this._receiverInfo.hasAttachment())
154 			{
155 				out = new ByteBufferOutputStream();
156 				serialize(adapter, out);
157 				this._receiverInfo.releaseAttachment();
158 				dataBuf = out.getBackingBuffer();
159 				dataBuf.flip();
160 				IOUtils.writeHeader(this._receiverInfo, dataBuf);
161 				IOUtils.nonBlockingWrite(channel, dataBuf);
162 			}
163 			else if (this._receiverInfo.hasWaitingBuffer())
164 			{
165 				dataBuf = this._receiverInfo.getWaitingBuffer();
166 				IOUtils.nonBlockingWrite(channel, dataBuf);
167 			}
168 			// seems that we have no answer for the client
169 			else
170 			{
171 				IOUtils.writeHeader(this._receiverInfo, null);
172 			}
173 
174 			log.log(Level.FINE, "Server response sent.");
175 		}
176 		finally
177 		{
178 			IOUtils.closeQuite(out);
179 		}
180 	}
181 
182 	/***
183 	 * @throws IOException
184 	 */
185 	private void writeBlocked(SerializeAdapter adapter) throws IOException
186 	{
187 		log.log(Level.FINE, "Going to send server response... ");
188 		serialize(adapter, Channels.newOutputStream((WritableByteChannel) this._receiverInfo.getChannel()));
189 		log.log(Level.FINE, "Server response sent.");
190 	}
191 
192 	/***
193 	 * @param out
194 	 * @throws IOException
195 	 */
196 	private void serialize(SerializeAdapter adapter, OutputStream out) throws IOException
197 	{
198 		boolean compressed = this._senderInfo.hasCompression() && this._receiverInfo.hasCompression();
199 		IOUtils.adapterSerialize(adapter, out, this._receiverInfo.getAttachment(), compressed, this._senderInfo
200 				.getCompressionLevel());
201 	}
202 }