1 /**********************************************************************
2 * ThreadPool.java
3 * created on 07.08.2004 by netseeker
4 * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/concurrent/ThreadPool.java,v $
5 * $Date: 2006/11/10 00:35:14 $
6 * $Revision: 1.24 $
7 *
8 * ====================================================================
9 *
10 * Copyright 2005-2006 netseeker aka Michael Manske
11 *
12 * Licensed under the Apache License, Version 2.0 (the "License");
13 * you may not use this file except in compliance with the License.
14 * You may obtain a copy of the License at
15 *
16 * http://www.apache.org/licenses/LICENSE-2.0
17 *
18 * Unless required by applicable law or agreed to in writing, software
19 * distributed under the License is distributed on an "AS IS" BASIS,
20 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21 * See the License for the specific language governing permissions and
22 * limitations under the License.
23 * ====================================================================
24 *
25 * This file is part of the ejoe framework.
26 * For more information on the author, please see
27 * <http://www.manskes.de/>.
28 *
29 *********************************************************************/
30
31 package de.netseeker.ejoe.concurrent;
32
33 import java.util.logging.Level;
34 import java.util.logging.Logger;
35
36 /***
37 * Fixed size thread pool implementation for usage with java < 1.5.0
38 *
39 * @author netseeker aka Michael Manske
40 * @since 0.3.0
41 */
42 public class ThreadPool implements Runnable, ThreadService
43 {
44 private static final Logger logger = Logger.getLogger( ThreadPool.class.getName() );
45
46 private final ThreadQueue queue = new ThreadQueue();
47
48 private boolean stopped;
49
50 private ThreadGroup threadGroup;
51
52 private int numberOfThreads;
53
54 /***
55 * @param numberOfThreads
56 * @param threadPriority
57 */
58 public ThreadPool(int numberOfThreads, int threadPriority)
59 {
60 for ( int i = 0; i < numberOfThreads; i++ )
61 {
62 startThread( threadPriority, i + 1 );
63 }
64 }
65
66 /***
67 * @param numberOfThreads
68 */
69 public ThreadPool(int numberOfThreads)
70 {
71 for ( int i = 0; i < numberOfThreads; i++ )
72 {
73 startThread( i + 1 );
74 }
75 }
76
77 /***
78 *
79 */
80 public ThreadPool()
81 {
82
83 startThread( 1 );
84 }
85
86 /***
87 * @param threadGroup
88 */
89 public ThreadPool(final ThreadGroup threadGroup)
90 {
91 this.threadGroup = threadGroup;
92 startThread( 1 );
93 }
94
95 /***
96 * @param threadGroup
97 * @param numberOfThreads
98 */
99 public ThreadPool(final ThreadGroup threadGroup, int numberOfThreads)
100 {
101 this.threadGroup = threadGroup;
102 for ( int i = 0; i < numberOfThreads; i++ )
103 {
104 startThread( i + 1 );
105 }
106 }
107
108 /***
109 * Start a new thread running
110 */
111 private Thread startThread( int index )
112 {
113 Thread thread = createThread( index );
114 thread.start();
115 this.numberOfThreads++;
116 return thread;
117 }
118
119 /***
120 * @param priority
121 * @return
122 */
123 private Thread startThread( int priority, int index )
124 {
125 Thread thread = createThread( index );
126 thread.setPriority( priority );
127 thread.start();
128 this.numberOfThreads++;
129 return thread;
130 }
131
132 private Thread createThread( int index )
133 {
134 Thread t = null;
135 if ( this.threadGroup != null )
136 {
137 t = new Thread( this.threadGroup, this, this.threadGroup.getName() + '(' + index + ')' );
138 }
139 else
140 {
141 t = new Thread( this );
142 }
143
144 t.setDaemon( true );
145 return t;
146 }
147
148 /***
149 * Signals this pool not to request and run workers from the queue
150 */
151 public void stop()
152 {
153 stopped = true;
154 }
155
156 /***
157 * Returns number of runnable object in the queue.
158 */
159 public int getRunnableCount()
160 {
161 return queue.size();
162 }
163
164 /***
165 * Dispatch a new task onto this pool to be invoked asynchronously later
166 */
167 public void invokeLater( Runnable task )
168 {
169 queue.add( task );
170 }
171
172 /***
173 * @return
174 */
175 public Thread[] getAllWorkers()
176 {
177 Thread[] threads = new Thread[this.numberOfThreads];
178 int count = this.threadGroup.enumerate( threads, false );
179 Thread[] result = new Thread[count];
180 System.arraycopy( threads, 0, result, 0, count );
181
182 return result;
183 }
184
185 /***
186 * Returns the count of currently active worker threads
187 *
188 * @return the count of currently active worker threads
189 */
190 public int getActiveWorkerCount()
191 {
192 return this.threadGroup.activeCount();
193 }
194
195 /***
196 * Returns the expected pool size, which might differ from the real pool size because workers died caused by
197 * unexpected exceptions etc.
198 *
199 * @return the expected pool size
200 */
201 public int getExpectedPoolsize()
202 {
203 return this.numberOfThreads;
204 }
205
206 /***
207 * Returns the real pool size which is the count of all living worker threads in the pool
208 *
209 * @return the real pool size
210 */
211 public int getCurrentPoolsize()
212 {
213 return getAllWorkers().length;
214 }
215
216 /***
217 * Resizes the pool to the new size
218 *
219 * @param numberOfThreads the new pool size
220 */
221 public void resize( int poolSize )
222 {
223 Thread[] workers = getAllWorkers();
224 int currentLength = workers.length;
225 if ( currentLength == poolSize )
226 {
227 logger.log( Level.FINEST, "Nothing to do, current poolsize equals requested size " + poolSize );
228 return;
229 }
230
231 logger.log( Level.INFO, "Resizing to " + poolSize );
232
233 if ( currentLength > poolSize )
234 {
235 int stop = currentLength - poolSize;
236
237 for ( int i = 0; i < currentLength; i++ )
238 {
239 if ( i > stop || workers[i] == null )
240 break;
241 else
242 workers[i].interrupt();
243 }
244 }
245 else
246 {
247 while ( currentLength < poolSize )
248 {
249 startThread( currentLength );
250 currentLength++;
251 }
252 }
253
254 this.numberOfThreads = getCurrentPoolsize();
255 }
256
257 /***
258 * The method ran by the pool of background threads
259 */
260 public void run()
261 {
262 while ( !stopped && !Thread.currentThread().isInterrupted() )
263 {
264 Runnable task = queue.remove();
265 if ( task != null )
266 {
267 try
268 {
269 task.run();
270 }
271 catch ( Throwable e )
272 {
273 logger.log( Level.SEVERE, "Exception occured in worker: " + task.toString(), e );
274 }
275 }
276 }
277 }
278 }