View Javadoc

1   /**********************************************************************
2    * ThreadPool.java
3    * created on 07.08.2004 by netseeker
4    * $Source: /cvsroot/ejoe/EJOE/src/de/netseeker/ejoe/concurrent/ThreadPool.java,v $
5    * $Date: 2006/11/10 00:35:14 $
6    * $Revision: 1.24 $
7    *
8    * ====================================================================
9    *
10   *  Copyright 2005-2006 netseeker aka Michael Manske
11   *
12   *  Licensed under the Apache License, Version 2.0 (the "License");
13   *  you may not use this file except in compliance with the License.
14   *  You may obtain a copy of the License at
15   *
16   *      http://www.apache.org/licenses/LICENSE-2.0
17   *
18   *  Unless required by applicable law or agreed to in writing, software
19   *  distributed under the License is distributed on an "AS IS" BASIS,
20   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21   *  See the License for the specific language governing permissions and
22   *  limitations under the License.
23   * ====================================================================
24   *
25   * This file is part of the ejoe framework.
26   * For more information on the author, please see
27   * <http://www.manskes.de/>.
28   *
29   *********************************************************************/
30  
31  package de.netseeker.ejoe.concurrent;
32  
33  import java.util.logging.Level;
34  import java.util.logging.Logger;
35  
36  /***
37   * Fixed size thread pool implementation for usage with java < 1.5.0
38   * 
39   * @author netseeker aka Michael Manske
40   * @since 0.3.0
41   */
42  public class ThreadPool implements Runnable, ThreadService
43  {
44      private static final Logger logger = Logger.getLogger( ThreadPool.class.getName() );
45  
46      private final ThreadQueue   queue  = new ThreadQueue();
47  
48      private boolean             stopped;
49  
50      private ThreadGroup         threadGroup;
51  
52      private int                 numberOfThreads;
53  
54      /***
55       * @param numberOfThreads
56       * @param threadPriority
57       */
58      public ThreadPool(int numberOfThreads, int threadPriority)
59      {
60          for ( int i = 0; i < numberOfThreads; i++ )
61          {
62              startThread( threadPriority, i + 1 );
63          }
64      }
65  
66      /***
67       * @param numberOfThreads
68       */
69      public ThreadPool(int numberOfThreads)
70      {
71          for ( int i = 0; i < numberOfThreads; i++ )
72          {
73              startThread( i + 1 );
74          }
75      }
76  
77      /***
78       * 
79       */
80      public ThreadPool()
81      {
82          // typically a thread pool should have at least 1 thread
83          startThread( 1 );
84      }
85  
86      /***
87       * @param threadGroup
88       */
89      public ThreadPool(final ThreadGroup threadGroup)
90      {
91          this.threadGroup = threadGroup;
92          startThread( 1 );
93      }
94  
95      /***
96       * @param threadGroup
97       * @param numberOfThreads
98       */
99      public ThreadPool(final ThreadGroup threadGroup, int numberOfThreads)
100     {
101         this.threadGroup = threadGroup;
102         for ( int i = 0; i < numberOfThreads; i++ )
103         {
104             startThread( i + 1 );
105         }
106     }
107 
108     /***
109      * Start a new thread running
110      */
111     private Thread startThread( int index )
112     {
113         Thread thread = createThread( index );
114         thread.start();
115         this.numberOfThreads++;
116         return thread;
117     }
118 
119     /***
120      * @param priority
121      * @return
122      */
123     private Thread startThread( int priority, int index )
124     {
125         Thread thread = createThread( index );
126         thread.setPriority( priority );
127         thread.start();
128         this.numberOfThreads++;
129         return thread;
130     }
131 
132     private Thread createThread( int index )
133     {
134         Thread t = null;
135         if ( this.threadGroup != null )
136         {
137             t = new Thread( this.threadGroup, this, this.threadGroup.getName() + '(' + index + ')' );
138         }
139         else
140         {
141             t = new Thread( this );
142         }
143 
144         t.setDaemon( true );
145         return t;
146     }
147 
148     /***
149      * Signals this pool not to request and run workers from the queue
150      */
151     public void stop()
152     {
153         stopped = true;
154     }
155 
156     /***
157      * Returns number of runnable object in the queue.
158      */
159     public int getRunnableCount()
160     {
161         return queue.size();
162     }
163 
164     /***
165      * Dispatch a new task onto this pool to be invoked asynchronously later
166      */
167     public void invokeLater( Runnable task )
168     {
169         queue.add( task );
170     }
171 
172     /***
173      * @return
174      */
175     public Thread[] getAllWorkers()
176     {
177         Thread[] threads = new Thread[this.numberOfThreads];
178         int count = this.threadGroup.enumerate( threads, false );
179         Thread[] result = new Thread[count];
180         System.arraycopy( threads, 0, result, 0, count );
181 
182         return result;
183     }
184 
185     /***
186      * Returns the count of currently active worker threads
187      * 
188      * @return the count of currently active worker threads
189      */
190     public int getActiveWorkerCount()
191     {
192         return this.threadGroup.activeCount();
193     }
194 
195     /***
196      * Returns the expected pool size, which might differ from the real pool size because workers died caused by
197      * unexpected exceptions etc.
198      * 
199      * @return the expected pool size
200      */
201     public int getExpectedPoolsize()
202     {
203         return this.numberOfThreads;
204     }
205 
206     /***
207      * Returns the real pool size which is the count of all living worker threads in the pool
208      * 
209      * @return the real pool size
210      */
211     public int getCurrentPoolsize()
212     {
213         return getAllWorkers().length;
214     }
215 
216     /***
217      * Resizes the pool to the new size
218      * 
219      * @param numberOfThreads the new pool size
220      */
221     public void resize( int poolSize )
222     {
223         Thread[] workers = getAllWorkers();
224         int currentLength = workers.length;
225         if ( currentLength == poolSize )
226         {
227             logger.log( Level.FINEST, "Nothing to do, current poolsize equals requested size " + poolSize );
228             return;
229         }
230 
231         logger.log( Level.INFO, "Resizing to " + poolSize );
232 
233         if ( currentLength > poolSize )
234         {
235             int stop = currentLength - poolSize;
236 
237             for ( int i = 0; i < currentLength; i++ )
238             {
239                 if ( i > stop || workers[i] == null )
240                     break;
241                 else
242                     workers[i].interrupt();
243             }
244         }
245         else
246         {
247             while ( currentLength < poolSize )
248             {
249                 startThread( currentLength );
250                 currentLength++;
251             }
252         }
253 
254         this.numberOfThreads = getCurrentPoolsize();
255     }
256 
257     /***
258      * The method ran by the pool of background threads
259      */
260     public void run()
261     {
262         while ( !stopped && !Thread.currentThread().isInterrupted() )
263         {
264             Runnable task = queue.remove();
265             if ( task != null )
266             {
267                 try
268                 {
269                     task.run();
270                 }
271                 catch ( Throwable e )
272                 {
273                     logger.log( Level.SEVERE, "Exception occured in worker: " + task.toString(), e );
274                 }
275             }
276         }
277     }
278 }