View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    * http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package net.sourceforge.basher.internal.impl;
16  
17  import java.util.HashSet;
18  import java.util.Set;
19  import java.util.concurrent.Semaphore;
20  import java.util.concurrent.atomic.AtomicBoolean;
21  
22  import net.sourceforge.basher.Basher;
23  import net.sourceforge.basher.TaskExecutionContext;
24  import net.sourceforge.basher.TaskManager;
25  import net.sourceforge.basher.events.*;
26  import net.sourceforge.basher.internal.Randomizer;
27  import net.sourceforge.basher.internal.TaskInvoker;
28  import net.sourceforge.basher.internal.TaskRunner;
29  import org.apache.commons.logging.Log;
30  import org.ops4j.gaderian.events.RegistryShutdownListener;
31  
32  /**
33   * @author Johan Lindquist
34   * @version 1.0
35   */
36  public class TaskRunnerImpl implements TaskRunner, BasherEventListener, RegistryShutdownListener
37  {
38      private Randomizer _randomizer;
39      private TaskManager _taskManager;
40      private TaskInvoker _taskInvoker;
41      private Log _logger;
42      private int _minTime;
43      private int _maxDelay;
44  
45      private final Set<String> _threadsShutdown = new HashSet<String>();
46  
47      private final Semaphore _semaphore = new Semaphore(0);
48  
49      private final AtomicBoolean _keepRunning = new AtomicBoolean(true);
50      public int numberOfThreadsRunning = 0;
51  
52      public void setRandomizer(final Randomizer randomizer)
53      {
54          _randomizer = randomizer;
55      }
56  
57      public void setLog(final Log logger)
58      {
59          _logger = logger;
60      }
61  
62      public void setTaskManager(final TaskManager taskManager)
63      {
64          _taskManager = taskManager;
65      }
66  
67      public void setTaskInvoker(final TaskInvoker taskInvoker)
68      {
69          _taskInvoker = taskInvoker;
70      }
71  
72  
73      public void stopInvoking()
74      {
75          // And cancel running too
76          _keepRunning.set(false);
77      }
78  
79      /**
80       *
81       */
82      public void run()
83      {
84          // This call actually blocks if we are not yet running
85          while (keepProcessing())
86          {
87              try
88              {
89                  // Retrieve the next task to run
90                  final TaskExecutionContext taskExecutionContext = _taskManager.getNextTaskExecutionContext();
91  
92                  // Task Manager API will return null if not tasks are available
93                  if ( taskExecutionContext != null)
94                  {
95                      _taskInvoker.invokeTask( taskExecutionContext );
96                  }
97  
98                  // Simple check to ensure we dont overdo things ...
99                  if (_keepRunning.get())
100                 {
101                     Basher.fireCleanUpThread();
102                     Thread.sleep(getRandomSleepTime());
103                 }
104             }
105             catch (final Throwable e)
106             {
107                 _logger.error(e.getMessage(), e);
108             }
109         }
110     }
111 
112     private boolean keepProcessing()
113     {
114         try
115         {
116             _semaphore.acquire();
117             try
118             {
119                 // Once we wake up, check if we need to shutdown
120                 final String threadName = Thread.currentThread().getName();
121                 if (_threadsShutdown.contains(threadName))
122                 {
123                     _threadsShutdown.remove(threadName);
124                     // Remove one more permit
125                     _semaphore.acquire();
126                     return false;
127                 }
128                 return _keepRunning.get();
129             }
130             finally
131             {
132                 _semaphore.release();
133             }
134 
135         }
136         catch (final InterruptedException e)
137         {
138             //
139         }
140         return false;
141     }
142 
143     private long getRandomSleepTime()
144     {
145         // Should create (_minTime < sleepTime < (_minTime + _maxDelay))
146         return _minTime + _randomizer.getRandomInt(_maxDelay);
147     }
148 
149     public void basherEvent(final BasherEvent basherEvent)
150     {
151         if (basherEvent instanceof PhaseTransitionEvent)
152         {
153             // Anything?
154             final PhaseTransitionEvent phaseTransitionEvent = (PhaseTransitionEvent) basherEvent;
155             _maxDelay = phaseTransitionEvent.getBasherContext().getTaskMaxDelay();
156             _minTime = phaseTransitionEvent.getBasherContext().getTaskMinDelay();
157 
158             switch (phaseTransitionEvent.getNewPhase())
159             {
160                 case START:
161                     numberOfThreadsRunning = phaseTransitionEvent.getBasherContext().getInitialNumberThreads();
162                     break;
163                 case SETUP:
164                     break;
165                 case RUN:
166                     // Release all waiting threads
167                     _logger.info("Number of available permits: " + _semaphore.availablePermits());
168                     _logger.info("Number of threads: " + numberOfThreadsRunning);
169                     _semaphore.release(numberOfThreadsRunning);
170                     break;
171                 case COOLDOWN:
172                 case TEARDOWN:
173                     try
174                     {
175                         _semaphore.acquire(_semaphore.availablePermits());
176                     }
177                     catch (final InterruptedException e)
178                     {
179                         e.printStackTrace();
180                     }
181                     break;
182                 case END:
183                     flipProcessing(false);
184                 default:
185             }
186         }
187         else if (basherEvent instanceof ThreadAddedEvent)
188         {
189             numberOfThreadsRunning = ((ThreadAddedEvent)basherEvent).getCurrentNumberThreads();
190             _semaphore.release(1);
191         }
192         else if (basherEvent instanceof ThreadRemovedEvent)
193         {
194             final ThreadRemovedEvent threadRemovedEvent = (ThreadRemovedEvent) basherEvent;
195             numberOfThreadsRunning = threadRemovedEvent.getCurrentNumberThreads();
196             final String threadName = threadRemovedEvent.getName();
197             // Simple check to see if the thread is already awaiting shutdown
198             if (!_threadsShutdown.contains(threadName))
199             {
200                 _logger.info("Thread " + threadName + " shutting down on request");
201                 _threadsShutdown.add(threadName);
202             }
203         }
204         else if (basherEvent instanceof NoTasksAvailableEvent)
205         {
206             flipProcessing(false);
207         }
208         else if (basherEvent instanceof TasksAvailableEvent)
209         {
210             // Release all the waiting threads
211             flipProcessing(true);
212         }
213     }
214 
215     private void flipProcessing(final boolean state)
216     {
217         if (state)
218         {
219             _semaphore.release(numberOfThreadsRunning);
220         }
221         else
222         {
223             if (_semaphore.availablePermits() == 0)
224             {
225                 // No need to acquire any - all threads are holding at this stage
226             }
227             else
228             {
229                 try
230                 {
231                     _semaphore.acquire(_semaphore.availablePermits());
232                 }
233                 catch (final InterruptedException e)
234                 {
235                     e.printStackTrace();
236                 }
237             }
238         }
239     }
240 
241     public void registryDidShutdown()
242     {
243         stopInvoking();
244     }
245 }