View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    * http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  /*
16   * Licensed under the Apache License, Version 2.0 (the "License");
17   * you may not use this file except in compliance with the License.
18   * You may obtain a copy of the License at
19   *
20   * http://www.apache.org/licenses/LICENSE-2.0
21   *
22   * Unless required by applicable law or agreed to in writing, software
23   * distributed under the License is distributed on an "AS IS" BASIS,
24   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25   * See the License for the specific language governing permissions and
26   * limitations under the License.
27   */
28  
29  /*
30   *
31   *
32   * Licensed under the Apache License, Version 2.0 (the "License");
33   * you may not use this file except in compliance with the License.
34   * You may obtain a copy of the License at
35   *
36   * 	http://www.apache.org/licenses/LICENSE-2.0
37   *
38   * Unless required by applicable law or agreed to in writing, software
39   * distributed under the License is distributed on an "AS IS" BASIS,
40   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
41   * See the License for the specific language governing permissions and
42   * limitations under the License.
43   */
44  package net.sourceforge.basher.impl;
45  
46  import java.util.ArrayList;
47  import java.util.List;
48  import java.util.UUID;
49  import java.util.concurrent.Executors;
50  import java.util.concurrent.ScheduledExecutorService;
51  import java.util.concurrent.ScheduledFuture;
52  import java.util.concurrent.TimeUnit;
53  
54  import net.sourceforge.basher.BasherContext;
55  import net.sourceforge.basher.ContextManager;
56  
57  import static net.sourceforge.basher.Phase.*;
58  
59  import net.sourceforge.basher.Scheduler;
60  import net.sourceforge.basher.Phase;
61  import net.sourceforge.basher.events.*;
62  import net.sourceforge.basher.internal.TaskRunner;
63  import net.sourceforge.basher.internal.tasks.ThreadIncrementTask;
64  import org.apache.commons.logging.Log;
65  
66  /**
67   * Implementation of the <code>Scheduler</code> interface.
68   *
69   * @author Johan Lindquist
70   * @version 1.0
71   */
72  public class SchedulerImpl implements Scheduler, BasherEventListener
73  {
74      private Log _logger;
75      private ContextManager _contextManager;
76      private EventManager _eventManager;
77      private TaskRunner _taskRunner;
78  
79      private List<String> _threadNames;
80      private ThreadGroup _threadGroup;
81  
82      private int _threadCounter = 0;
83      private boolean _running = false;
84  
85      private Phase _currentState = null;
86  
87      private TickTimerTask _tickTimerTask;
88      private ThreadIncrementTask _threadIncrementTask;
89  
90      private ScheduledExecutorService _scheduledExecutorService = Executors.newScheduledThreadPool(5);
91  
92      private ScheduledFuture<?> _collectionStartScheduledFuture;
93      private ScheduledFuture<?> _collectionStopScheduledFuture;
94      private ScheduledFuture<?> _tickTimerTaskScheduledFuture;
95      private ScheduledFuture<?> _threadIncrementTaskScheduledFuture;
96  
97      private ScheduledFuture<?> _lastScheduledPhaseTransitionFuture;
98  
99  
100     /**
101      * {@inheritDoc}
102      */
103     public synchronized void addThread()
104     {
105         checkInitialized();
106         addNewThread(_contextManager.getActiveBasherContext());
107     }
108 
109     /**
110      * {@inheritDoc}
111      */
112     public void addThreads(final int numToAdd)
113     {
114         checkInitialized();
115         _logger.debug("Adding " + numToAdd + " thread(s)");
116         for (int i = 0; i < numToAdd; i++)
117         {
118             addThread();
119         }
120         _logger.debug(numToAdd + " thread(s) added");
121     }
122 
123     /**
124      * {@inheritDoc}
125      */
126     public synchronized void removeThread()
127     {
128         checkInitialized();
129 
130         if (_threadNames.size() > 0)
131         {
132             _logger.debug("Removing thread from active list");
133             String threadName = _threadNames.remove(0);
134             _logger.debug("Thread " + threadName + " removed from active list");
135             _logger.debug("Signalling stop thread for thread named: " + threadName);
136             _eventManager.publish(new ThreadRemovedEvent(threadName, _threadNames.size()));
137             _logger.debug("Thread stop signalled");
138         }
139         else
140         {
141             _logger.warn("No threads to remove");
142         }
143     }
144 
145     /**
146      * {@inheritDoc}
147      */
148     public void removeAllThreads()
149     {
150         checkInitialized();
151 
152         int threadsToRemove = _threadNames.size();
153         _logger.debug("Removing " + threadsToRemove + " thread(s)");
154         for (int i = 0; i < threadsToRemove; i++)
155         {
156             removeThread();
157         }
158         _logger.debug(threadsToRemove + " thread(s) removed");
159     }
160 
161     /**
162      * {@inheritDoc}
163      */
164     public void stop()
165     {
166         stopInternal(false);
167 
168     }
169 
170     private void stopInternal(boolean calledFromEvent)
171     {
172         // Check that we can stop
173         checkStopPrecondition();
174 
175         _logger.info("Stopping scheduler");
176 
177         if (!calledFromEvent)
178         {
179             // Signal stop to all interested parties
180             // Indicate we are flushing the system
181             _eventManager.publish(new PhaseTransitionEvent(_contextManager.getActiveBasherContext(), RUN, END, 0));
182         }
183 
184         // Clear the thread group and the list of active runners
185         _threadGroup = null;
186         _threadNames.clear();
187 
188         _running = false;
189 
190         _logger.info("Scheduler stopped");
191     }
192 
193     /**
194      * {@inheritDoc}
195      */
196     public void start()
197     {
198 
199         // Delegate the call
200         start(ContextManager.DEFAULT_BASHER_CONTEXT_NAME);
201 
202     }
203 
204 
205     public void start(final String contextName)
206     {
207         // Validate the parameters
208         if (contextName == null)
209         {
210             throw new NullPointerException("contextName");
211         }
212 
213         // Check if we have all dependencies required for start
214         checkStartPrecondition();
215 
216         // Look up the default context name
217         final BasherContext basherContext = lookupBasherContext(contextName);
218 
219         // Check that we actually have a context
220         if (basherContext == null)
221         {
222             throw new IllegalArgumentException("The context specified by '" + contextName + "' could not be found");
223         }
224 
225         // Delegate the call
226         start(basherContext);
227 
228     }
229 
230     public void start(final BasherContext basherContext)
231     {
232         // Validate the parameters
233         if (basherContext == null)
234         {
235             throw new NullPointerException("basherContext");
236         }
237 
238         // Check if we have all dependencies required for start
239         checkStartPrecondition();
240 
241         _logger.info("Starting scheduler with context: " + basherContext.getName());
242 
243         if (basherContext.getRunIdentifier() == null)
244         {
245             // Generate a new identifier for this run
246             UUID uuid = UUID.randomUUID();
247             _logger.info("Generated Run Identifier: " + uuid.toString());
248             basherContext.setRunIdentifier(uuid.toString());
249         }
250 
251         _logger.info("Basher Context Details:");
252         _logger.info("Identifier: " + basherContext.getRunIdentifier());
253         _logger.info("Run Duration: " + basherContext.getRunDuration());
254         _logger.info("Initial Number of Threads: " + basherContext.getInitialNumberThreads());
255         _logger.info("Average Interval: " + basherContext.getMarkAverageInterval());
256         _logger.info("BeanShell Directory: " + basherContext.getBeanShellScriptDirectory());
257 
258         // Initial signalling that we are starting up
259         _eventManager.publish(new PhaseTransitionEvent(basherContext, null, Phase.START, 0));
260 
261         _threadGroup = new ThreadGroup("Basher Threads");
262 
263         _threadNames = new ArrayList<String>();
264 
265         // Start the threads
266         for (int i = 0; i < basherContext.getInitialNumberThreads(); i++)
267         {
268             addNewThread(basherContext);
269         }
270 
271         _contextManager.setActiveBasherContext(basherContext);
272 
273         _running = true;
274         _currentState = null;
275 
276         // Signal interested parties that systems are go
277         _eventManager.publish(new PhaseTransitionEvent(basherContext, START, SETUP, basherContext.getSetupDuration()));
278 
279         _logger.info("Scheduler started.  " + basherContext.getInitialNumberThreads() + " thread(s) running");
280     }
281 
282     private BasherContext lookupBasherContext(final String contextName)
283     {
284         return _contextManager.getBasherContext(contextName);
285     }
286 
287     /**
288      * {@inheritDoc}
289      */
290     public int getNumberOfActiveThreads()
291     {
292         checkInitialized();
293         return _threadNames.size();
294     }
295 
296     /**
297      * {@inheritDoc}
298      */
299     public boolean isRunning()
300     {
301         return _running;
302     }
303 
304     public Phase getCurrentPhase()
305     {
306         if (_running)
307         {
308             return _currentState;
309         }
310         else
311         {
312             throw new IllegalStateException("Not running");
313         }
314     }
315 
316     /**
317      * Adds a new thread to the currently running thread group.  Will only add a new thread if we have not yet reached
318      * the maximum number of threads allowed.
319      *
320      * @param basherContext The context currently used.  This is to determine the various boundaries of the threads.
321      */
322     private void addNewThread(final BasherContext basherContext)
323     {
324         // Have we reached the maximum number of threads?
325         if (_threadNames.size() < basherContext.getMaxNumberThreads())
326         {
327             // No, so add a new one
328             _logger.debug("Adding new thread");
329 
330             // And the new thread to run and start it
331             final String threadName = "TaskRunner-" + _threadCounter++;
332             final Thread thread = new Thread(_threadGroup, _taskRunner, threadName);
333             thread.setDaemon(true);
334             thread.start();
335 
336             // Add the task runner to our internally managed list
337             _threadNames.add(threadName);
338             final int currentNumberThreads = _threadNames.size();
339             _logger.info("Thread added.  " + currentNumberThreads + " thread(s) running");
340 
341             // And signal the start
342             _eventManager.publish(new ThreadAddedEvent(threadName, currentNumberThreads, basherContext));
343         }
344         else
345         {
346             _logger.warn("Maximum thread limit (" + _contextManager.getActiveBasherContext().getMaxNumberThreads() + ") reached, not adding more threads");
347         }
348     }
349 
350     /**
351      * Checks if this <code>SchedulerImpl</code> instance has been started or not.
352      *
353      * @throws IllegalStateException If the instance is NOT running
354      */
355     private void checkInitialized()
356     {
357         if (!_running)
358         {
359             throw new IllegalStateException("Not started");
360         }
361     }
362 
363     /**
364      * Convenience method to check that all pre-conditions for a start are met
365      *
366      * @throws IllegalStateException If any of the pre-conditions fails
367      */
368     private void checkStartPrecondition()
369     {
370         if (_running)
371         {
372             throw new IllegalStateException("Already started");
373         }
374 
375         if (_logger == null)
376         {
377             throw new IllegalStateException("no log");
378         }
379 
380         if (_contextManager == null)
381         {
382             throw new IllegalStateException("no context manager");
383         }
384 
385         if (_eventManager == null)
386         {
387             throw new IllegalStateException("no event manager");
388         }
389 
390         if (_taskRunner == null)
391         {
392             throw new IllegalStateException("no task runner");
393         }
394     }
395 
396     /**
397      * Convenience method to check that all pre-conditions for a stop are met
398      *
399      * @throws IllegalStateException If any of the pre-conditions fails
400      */
401     private void checkStopPrecondition()
402     {
403         if (!_running)
404         {
405             throw new IllegalStateException("Already stopped");
406         }
407     }
408 
409 
410     /** Setters for various fields */
411 
412     /**
413      * Sets the logging instance to use
414      *
415      * @param logger The logging instance to use
416      */
417     public void setLog(final Log logger)
418     {
419         _logger = logger;
420     }
421 
422     /**
423      * Sets the context manager instance to use
424      *
425      * @param contextManager The context manager instance to use
426      */
427     public void setContextManager(final ContextManager contextManager)
428     {
429         _contextManager = contextManager;
430     }
431 
432     /**
433      * Sets the event manager to use
434      *
435      * @param eventManager The event manager instance to use
436      */
437     public void setEventManager(final EventManager eventManager)
438     {
439         _eventManager = eventManager;
440     }
441 
442     /**
443      * Sets the task runner to use
444      *
445      * @param taskRunner The task runner instance to use
446      */
447     public void setTaskRunner(final TaskRunner taskRunner)
448     {
449         _taskRunner = taskRunner;
450     }
451 
452     public void setTickTimerTask(final TickTimerTask tickTimerTask)
453     {
454         _tickTimerTask = tickTimerTask;
455     }
456 
457     public void setThreadIncrementTask(final ThreadIncrementTask threadIncrementTask)
458     {
459         _threadIncrementTask = threadIncrementTask;
460     }
461 
462     public void setScheduledExecutorService(final ScheduledExecutorService scheduledExecutorService)
463     {
464         _scheduledExecutorService = scheduledExecutorService;
465     }
466 
467     public void basherEvent(final BasherEvent basherEvent)
468     {
469         // Phase transition events
470 
471         // Ticks should be cleared after run stage
472         if (basherEvent instanceof PhaseTransitionEvent)
473         {
474             final PhaseTransitionEvent phaseTransitionEvent = (PhaseTransitionEvent) basherEvent;
475             final BasherContext basherContext = phaseTransitionEvent.getBasherContext();
476 
477             _logger.info(String.format("Received phase transition: %s -> %s [%s duration %d second(s)]",
478                     (phaseTransitionEvent.getOldPhase() == null ? "N/A" : phaseTransitionEvent.getOldPhase()),
479                     phaseTransitionEvent.getNewPhase(),
480                     phaseTransitionEvent.getNewPhase(),
481                     phaseTransitionEvent.getDurationNextPhase()
482             ));
483 
484             _currentState = phaseTransitionEvent.getNewPhase();
485 
486             switch (phaseTransitionEvent.getNewPhase())
487             {
488                 case START:
489                     processStartEvent(basherContext);
490                     break;
491                 case SETUP:
492                     processSetupEvent(basherContext);
493                     break;
494                 case WARMUP:
495                     processWarmupEvent(basherContext);
496                     break;
497                 case RUN:
498                     processRunEvent(basherContext);
499                     break;
500                 case COOLDOWN:
501                     processCooldownEvent(basherContext);
502                     break;
503                 case TEARDOWN:
504                     processTeardownEvent(basherContext);
505                     break;
506                 case END:
507                     processEndEvent();
508                 default:
509             }
510         }
511         else if (basherEvent instanceof NoTasksAvailableEvent)
512         {
513             // proceed to next phase immediately!
514 
515             final BasherContext basherContext = _contextManager.getActiveBasherContext();
516 
517             // First, cancel the swap-over
518             switch (_currentState)
519             {
520                 case SETUP:
521                     _lastScheduledPhaseTransitionFuture.cancel(false);
522                     _logger.info("Scheduling immediate phase transition SETUP -> WARMUP");
523                     _lastScheduledPhaseTransitionFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new PhaseTransitionEvent(basherContext, SETUP, WARMUP, basherContext.getWarmupDuration())), 1000, TimeUnit.MILLISECONDS);
524                     break;
525                 case TEARDOWN:
526                     _lastScheduledPhaseTransitionFuture.cancel(false);
527                     _logger.info("Scheduling immediate phase transition TEARDOWN -> END");
528                     _lastScheduledPhaseTransitionFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new PhaseTransitionEvent(basherContext, TEARDOWN, END, basherContext.getWarmupDuration())), 1000, TimeUnit.MILLISECONDS);
529                     break;
530                 default:
531             }
532 
533         }
534 
535 
536     }
537 
538     private void processStartEvent(final BasherContext basherContext)
539     {
540         // Nothing for now
541     }
542 
543     private void processSetupEvent(final BasherContext basherContext)
544     {
545         // Schedule end of setup and start of warm-up
546         long nextEventTime = (1 + basherContext.getSetupDuration()) * 1000;
547         _logger.debug("Scheduling phase transition SETUP -> WARMUP at " + nextEventTime);
548         _lastScheduledPhaseTransitionFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new PhaseTransitionEvent(basherContext, SETUP, WARMUP, basherContext.getWarmupDuration())), nextEventTime, TimeUnit.MILLISECONDS);
549 
550     }
551 
552     private void processWarmupEvent(final BasherContext basherContext)
553     {
554         // Schedule end of setup and start of warm-up
555         long nextEventTime = 1 + (basherContext.getWarmupDuration() * 1000);
556         _logger.debug("Scheduling phase transition WARMUP -> RUN at " + nextEventTime);
557         _lastScheduledPhaseTransitionFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new PhaseTransitionEvent(basherContext, WARMUP, RUN, basherContext.getRunDuration())), nextEventTime, TimeUnit.MILLISECONDS);
558     }
559 
560     private void processRunEvent(final BasherContext basherContext)
561     {
562         // Schedule cool-down
563         long nextEventTime = 1 + (basherContext.getRunDuration() * 1000);
564         _logger.debug("Scheduling phase transition RUN -> COOLDOWN at " + nextEventTime);
565         _lastScheduledPhaseTransitionFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new PhaseTransitionEvent(basherContext, RUN, COOLDOWN, basherContext.getCooldownDuration())), nextEventTime, TimeUnit.MILLISECONDS);
566 
567         _tickTimerTaskScheduledFuture = _scheduledExecutorService.scheduleAtFixedRate(_tickTimerTask, basherContext.getMarkAverageInterval() * 1000, basherContext.getMarkAverageInterval() * 1000, TimeUnit.MILLISECONDS);
568 
569         // Schedule the ticker - emitting ticks within the system
570         if (basherContext.getThreadIncrementCount() > 0)
571         {
572             _threadIncrementTaskScheduledFuture = _scheduledExecutorService.scheduleAtFixedRate(_threadIncrementTask, basherContext.getThreadIncrementInterval() * 1000, basherContext.getThreadIncrementInterval() * 1000, TimeUnit.MILLISECONDS);
573         }
574 
575         // Schedule collection start
576         long startCollectionFrom = basherContext.getStartCollectionFrom();
577         if (startCollectionFrom == 0)
578         {
579             // Always start collection from the beginning if not specified!
580             startCollectionFrom = 1;
581         }
582         _collectionStartScheduledFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new CollectionStartedEvent(basherContext)), startCollectionFrom * 1000, TimeUnit.MILLISECONDS);
583 
584         // Stop collection only if a stop is defined!
585         if (basherContext.getStopCollectionAfter() > 0)
586         {
587             _collectionStopScheduledFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new CollectionStoppedEvent(basherContext)), basherContext.getStopCollectionAfter() * 1000, TimeUnit.MILLISECONDS);
588         }
589 
590 
591     }
592 
593     private void processCooldownEvent(final BasherContext basherContext)
594     {
595         // Schedule tear-down
596         long nextEventTime = 1 + (basherContext.getCooldownDuration() * 1000);
597         _logger.debug("Scheduling phase transition COOLDOWN -> TEARDOWN at " + nextEventTime);
598         _lastScheduledPhaseTransitionFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new PhaseTransitionEvent(basherContext, COOLDOWN, TEARDOWN, basherContext.getTeardownDuration())), nextEventTime, TimeUnit.MILLISECONDS);
599 
600         unscheduleCollectionIfNeeded();
601 
602         // Un-schedule the ticker
603         _tickTimerTaskScheduledFuture.cancel(true);
604         if (basherContext.getThreadIncrementCount() > 0)
605         {
606             _threadIncrementTaskScheduledFuture.cancel(true);
607         }
608 
609     }
610 
611     private void unscheduleCollectionIfNeeded()
612     {
613         if (_collectionStartScheduledFuture != null)
614         {
615             _collectionStartScheduledFuture.cancel(false);
616         }
617         if (_collectionStopScheduledFuture != null)
618         {
619             _collectionStopScheduledFuture.cancel(false);
620         }
621     }
622 
623     private void processTeardownEvent(final BasherContext basherContext)
624     {
625         unscheduleCollectionIfNeeded();
626 
627         // Schedule end of run
628         long nextEventTime = 1 + (basherContext.getTeardownDuration() * 1000);
629         _logger.debug("Scheduling phase transition TEARDOWN -> END at " + nextEventTime);
630         _lastScheduledPhaseTransitionFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new PhaseTransitionEvent(basherContext, TEARDOWN, END, 0)), nextEventTime, TimeUnit.MILLISECONDS);
631     }
632 
633     private void processEndEvent()
634     {
635         unscheduleCollectionIfNeeded();
636 
637         stopInternal(true);
638     }
639 
640 }