1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package net.sourceforge.basher.impl;
45
46 import java.util.ArrayList;
47 import java.util.List;
48 import java.util.UUID;
49 import java.util.concurrent.Executors;
50 import java.util.concurrent.ScheduledExecutorService;
51 import java.util.concurrent.ScheduledFuture;
52 import java.util.concurrent.TimeUnit;
53
54 import net.sourceforge.basher.BasherContext;
55 import net.sourceforge.basher.ContextManager;
56
57 import static net.sourceforge.basher.Phase.*;
58
59 import net.sourceforge.basher.Scheduler;
60 import net.sourceforge.basher.Phase;
61 import net.sourceforge.basher.events.*;
62 import net.sourceforge.basher.internal.TaskRunner;
63 import net.sourceforge.basher.internal.tasks.ThreadIncrementTask;
64 import org.apache.commons.logging.Log;
65
66
67
68
69
70
71
72 public class SchedulerImpl implements Scheduler, BasherEventListener
73 {
74 private Log _logger;
75 private ContextManager _contextManager;
76 private EventManager _eventManager;
77 private TaskRunner _taskRunner;
78
79 private List<String> _threadNames;
80 private ThreadGroup _threadGroup;
81
82 private int _threadCounter = 0;
83 private boolean _running = false;
84
85 private Phase _currentState = null;
86
87 private TickTimerTask _tickTimerTask;
88 private ThreadIncrementTask _threadIncrementTask;
89
90 private ScheduledExecutorService _scheduledExecutorService = Executors.newScheduledThreadPool(5);
91
92 private ScheduledFuture<?> _collectionStartScheduledFuture;
93 private ScheduledFuture<?> _collectionStopScheduledFuture;
94 private ScheduledFuture<?> _tickTimerTaskScheduledFuture;
95 private ScheduledFuture<?> _threadIncrementTaskScheduledFuture;
96
97 private ScheduledFuture<?> _lastScheduledPhaseTransitionFuture;
98
99
100
101
102
103 public synchronized void addThread()
104 {
105 checkInitialized();
106 addNewThread(_contextManager.getActiveBasherContext());
107 }
108
109
110
111
112 public void addThreads(final int numToAdd)
113 {
114 checkInitialized();
115 _logger.debug("Adding " + numToAdd + " thread(s)");
116 for (int i = 0; i < numToAdd; i++)
117 {
118 addThread();
119 }
120 _logger.debug(numToAdd + " thread(s) added");
121 }
122
123
124
125
126 public synchronized void removeThread()
127 {
128 checkInitialized();
129
130 if (_threadNames.size() > 0)
131 {
132 _logger.debug("Removing thread from active list");
133 String threadName = _threadNames.remove(0);
134 _logger.debug("Thread " + threadName + " removed from active list");
135 _logger.debug("Signalling stop thread for thread named: " + threadName);
136 _eventManager.publish(new ThreadRemovedEvent(threadName, _threadNames.size()));
137 _logger.debug("Thread stop signalled");
138 }
139 else
140 {
141 _logger.warn("No threads to remove");
142 }
143 }
144
145
146
147
148 public void removeAllThreads()
149 {
150 checkInitialized();
151
152 int threadsToRemove = _threadNames.size();
153 _logger.debug("Removing " + threadsToRemove + " thread(s)");
154 for (int i = 0; i < threadsToRemove; i++)
155 {
156 removeThread();
157 }
158 _logger.debug(threadsToRemove + " thread(s) removed");
159 }
160
161
162
163
164 public void stop()
165 {
166 stopInternal(false);
167
168 }
169
170 private void stopInternal(boolean calledFromEvent)
171 {
172
173 checkStopPrecondition();
174
175 _logger.info("Stopping scheduler");
176
177 if (!calledFromEvent)
178 {
179
180
181 _eventManager.publish(new PhaseTransitionEvent(_contextManager.getActiveBasherContext(), RUN, END, 0));
182 }
183
184
185 _threadGroup = null;
186 _threadNames.clear();
187
188 _running = false;
189
190 _logger.info("Scheduler stopped");
191 }
192
193
194
195
196 public void start()
197 {
198
199
200 start(ContextManager.DEFAULT_BASHER_CONTEXT_NAME);
201
202 }
203
204
205 public void start(final String contextName)
206 {
207
208 if (contextName == null)
209 {
210 throw new NullPointerException("contextName");
211 }
212
213
214 checkStartPrecondition();
215
216
217 final BasherContext basherContext = lookupBasherContext(contextName);
218
219
220 if (basherContext == null)
221 {
222 throw new IllegalArgumentException("The context specified by '" + contextName + "' could not be found");
223 }
224
225
226 start(basherContext);
227
228 }
229
230 public void start(final BasherContext basherContext)
231 {
232
233 if (basherContext == null)
234 {
235 throw new NullPointerException("basherContext");
236 }
237
238
239 checkStartPrecondition();
240
241 _logger.info("Starting scheduler with context: " + basherContext.getName());
242
243 if (basherContext.getRunIdentifier() == null)
244 {
245
246 UUID uuid = UUID.randomUUID();
247 _logger.info("Generated Run Identifier: " + uuid.toString());
248 basherContext.setRunIdentifier(uuid.toString());
249 }
250
251 _logger.info("Basher Context Details:");
252 _logger.info("Identifier: " + basherContext.getRunIdentifier());
253 _logger.info("Run Duration: " + basherContext.getRunDuration());
254 _logger.info("Initial Number of Threads: " + basherContext.getInitialNumberThreads());
255 _logger.info("Average Interval: " + basherContext.getMarkAverageInterval());
256 _logger.info("BeanShell Directory: " + basherContext.getBeanShellScriptDirectory());
257
258
259 _eventManager.publish(new PhaseTransitionEvent(basherContext, null, Phase.START, 0));
260
261 _threadGroup = new ThreadGroup("Basher Threads");
262
263 _threadNames = new ArrayList<String>();
264
265
266 for (int i = 0; i < basherContext.getInitialNumberThreads(); i++)
267 {
268 addNewThread(basherContext);
269 }
270
271 _contextManager.setActiveBasherContext(basherContext);
272
273 _running = true;
274 _currentState = null;
275
276
277 _eventManager.publish(new PhaseTransitionEvent(basherContext, START, SETUP, basherContext.getSetupDuration()));
278
279 _logger.info("Scheduler started. " + basherContext.getInitialNumberThreads() + " thread(s) running");
280 }
281
282 private BasherContext lookupBasherContext(final String contextName)
283 {
284 return _contextManager.getBasherContext(contextName);
285 }
286
287
288
289
290 public int getNumberOfActiveThreads()
291 {
292 checkInitialized();
293 return _threadNames.size();
294 }
295
296
297
298
299 public boolean isRunning()
300 {
301 return _running;
302 }
303
304 public Phase getCurrentPhase()
305 {
306 if (_running)
307 {
308 return _currentState;
309 }
310 else
311 {
312 throw new IllegalStateException("Not running");
313 }
314 }
315
316
317
318
319
320
321
322 private void addNewThread(final BasherContext basherContext)
323 {
324
325 if (_threadNames.size() < basherContext.getMaxNumberThreads())
326 {
327
328 _logger.debug("Adding new thread");
329
330
331 final String threadName = "TaskRunner-" + _threadCounter++;
332 final Thread thread = new Thread(_threadGroup, _taskRunner, threadName);
333 thread.setDaemon(true);
334 thread.start();
335
336
337 _threadNames.add(threadName);
338 final int currentNumberThreads = _threadNames.size();
339 _logger.info("Thread added. " + currentNumberThreads + " thread(s) running");
340
341
342 _eventManager.publish(new ThreadAddedEvent(threadName, currentNumberThreads, basherContext));
343 }
344 else
345 {
346 _logger.warn("Maximum thread limit (" + _contextManager.getActiveBasherContext().getMaxNumberThreads() + ") reached, not adding more threads");
347 }
348 }
349
350
351
352
353
354
355 private void checkInitialized()
356 {
357 if (!_running)
358 {
359 throw new IllegalStateException("Not started");
360 }
361 }
362
363
364
365
366
367
368 private void checkStartPrecondition()
369 {
370 if (_running)
371 {
372 throw new IllegalStateException("Already started");
373 }
374
375 if (_logger == null)
376 {
377 throw new IllegalStateException("no log");
378 }
379
380 if (_contextManager == null)
381 {
382 throw new IllegalStateException("no context manager");
383 }
384
385 if (_eventManager == null)
386 {
387 throw new IllegalStateException("no event manager");
388 }
389
390 if (_taskRunner == null)
391 {
392 throw new IllegalStateException("no task runner");
393 }
394 }
395
396
397
398
399
400
401 private void checkStopPrecondition()
402 {
403 if (!_running)
404 {
405 throw new IllegalStateException("Already stopped");
406 }
407 }
408
409
410
411
412
413
414
415
416
417 public void setLog(final Log logger)
418 {
419 _logger = logger;
420 }
421
422
423
424
425
426
427 public void setContextManager(final ContextManager contextManager)
428 {
429 _contextManager = contextManager;
430 }
431
432
433
434
435
436
437 public void setEventManager(final EventManager eventManager)
438 {
439 _eventManager = eventManager;
440 }
441
442
443
444
445
446
447 public void setTaskRunner(final TaskRunner taskRunner)
448 {
449 _taskRunner = taskRunner;
450 }
451
452 public void setTickTimerTask(final TickTimerTask tickTimerTask)
453 {
454 _tickTimerTask = tickTimerTask;
455 }
456
457 public void setThreadIncrementTask(final ThreadIncrementTask threadIncrementTask)
458 {
459 _threadIncrementTask = threadIncrementTask;
460 }
461
462 public void setScheduledExecutorService(final ScheduledExecutorService scheduledExecutorService)
463 {
464 _scheduledExecutorService = scheduledExecutorService;
465 }
466
467 public void basherEvent(final BasherEvent basherEvent)
468 {
469
470
471
472 if (basherEvent instanceof PhaseTransitionEvent)
473 {
474 final PhaseTransitionEvent phaseTransitionEvent = (PhaseTransitionEvent) basherEvent;
475 final BasherContext basherContext = phaseTransitionEvent.getBasherContext();
476
477 _logger.info(String.format("Received phase transition: %s -> %s [%s duration %d second(s)]",
478 (phaseTransitionEvent.getOldPhase() == null ? "N/A" : phaseTransitionEvent.getOldPhase()),
479 phaseTransitionEvent.getNewPhase(),
480 phaseTransitionEvent.getNewPhase(),
481 phaseTransitionEvent.getDurationNextPhase()
482 ));
483
484 _currentState = phaseTransitionEvent.getNewPhase();
485
486 switch (phaseTransitionEvent.getNewPhase())
487 {
488 case START:
489 processStartEvent(basherContext);
490 break;
491 case SETUP:
492 processSetupEvent(basherContext);
493 break;
494 case WARMUP:
495 processWarmupEvent(basherContext);
496 break;
497 case RUN:
498 processRunEvent(basherContext);
499 break;
500 case COOLDOWN:
501 processCooldownEvent(basherContext);
502 break;
503 case TEARDOWN:
504 processTeardownEvent(basherContext);
505 break;
506 case END:
507 processEndEvent();
508 default:
509 }
510 }
511 else if (basherEvent instanceof NoTasksAvailableEvent)
512 {
513
514
515 final BasherContext basherContext = _contextManager.getActiveBasherContext();
516
517
518 switch (_currentState)
519 {
520 case SETUP:
521 _lastScheduledPhaseTransitionFuture.cancel(false);
522 _logger.info("Scheduling immediate phase transition SETUP -> WARMUP");
523 _lastScheduledPhaseTransitionFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new PhaseTransitionEvent(basherContext, SETUP, WARMUP, basherContext.getWarmupDuration())), 1000, TimeUnit.MILLISECONDS);
524 break;
525 case TEARDOWN:
526 _lastScheduledPhaseTransitionFuture.cancel(false);
527 _logger.info("Scheduling immediate phase transition TEARDOWN -> END");
528 _lastScheduledPhaseTransitionFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new PhaseTransitionEvent(basherContext, TEARDOWN, END, basherContext.getWarmupDuration())), 1000, TimeUnit.MILLISECONDS);
529 break;
530 default:
531 }
532
533 }
534
535
536 }
537
538 private void processStartEvent(final BasherContext basherContext)
539 {
540
541 }
542
543 private void processSetupEvent(final BasherContext basherContext)
544 {
545
546 long nextEventTime = (1 + basherContext.getSetupDuration()) * 1000;
547 _logger.debug("Scheduling phase transition SETUP -> WARMUP at " + nextEventTime);
548 _lastScheduledPhaseTransitionFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new PhaseTransitionEvent(basherContext, SETUP, WARMUP, basherContext.getWarmupDuration())), nextEventTime, TimeUnit.MILLISECONDS);
549
550 }
551
552 private void processWarmupEvent(final BasherContext basherContext)
553 {
554
555 long nextEventTime = 1 + (basherContext.getWarmupDuration() * 1000);
556 _logger.debug("Scheduling phase transition WARMUP -> RUN at " + nextEventTime);
557 _lastScheduledPhaseTransitionFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new PhaseTransitionEvent(basherContext, WARMUP, RUN, basherContext.getRunDuration())), nextEventTime, TimeUnit.MILLISECONDS);
558 }
559
560 private void processRunEvent(final BasherContext basherContext)
561 {
562
563 long nextEventTime = 1 + (basherContext.getRunDuration() * 1000);
564 _logger.debug("Scheduling phase transition RUN -> COOLDOWN at " + nextEventTime);
565 _lastScheduledPhaseTransitionFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new PhaseTransitionEvent(basherContext, RUN, COOLDOWN, basherContext.getCooldownDuration())), nextEventTime, TimeUnit.MILLISECONDS);
566
567 _tickTimerTaskScheduledFuture = _scheduledExecutorService.scheduleAtFixedRate(_tickTimerTask, basherContext.getMarkAverageInterval() * 1000, basherContext.getMarkAverageInterval() * 1000, TimeUnit.MILLISECONDS);
568
569
570 if (basherContext.getThreadIncrementCount() > 0)
571 {
572 _threadIncrementTaskScheduledFuture = _scheduledExecutorService.scheduleAtFixedRate(_threadIncrementTask, basherContext.getThreadIncrementInterval() * 1000, basherContext.getThreadIncrementInterval() * 1000, TimeUnit.MILLISECONDS);
573 }
574
575
576 long startCollectionFrom = basherContext.getStartCollectionFrom();
577 if (startCollectionFrom == 0)
578 {
579
580 startCollectionFrom = 1;
581 }
582 _collectionStartScheduledFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new CollectionStartedEvent(basherContext)), startCollectionFrom * 1000, TimeUnit.MILLISECONDS);
583
584
585 if (basherContext.getStopCollectionAfter() > 0)
586 {
587 _collectionStopScheduledFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new CollectionStoppedEvent(basherContext)), basherContext.getStopCollectionAfter() * 1000, TimeUnit.MILLISECONDS);
588 }
589
590
591 }
592
593 private void processCooldownEvent(final BasherContext basherContext)
594 {
595
596 long nextEventTime = 1 + (basherContext.getCooldownDuration() * 1000);
597 _logger.debug("Scheduling phase transition COOLDOWN -> TEARDOWN at " + nextEventTime);
598 _lastScheduledPhaseTransitionFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new PhaseTransitionEvent(basherContext, COOLDOWN, TEARDOWN, basherContext.getTeardownDuration())), nextEventTime, TimeUnit.MILLISECONDS);
599
600 unscheduleCollectionIfNeeded();
601
602
603 _tickTimerTaskScheduledFuture.cancel(true);
604 if (basherContext.getThreadIncrementCount() > 0)
605 {
606 _threadIncrementTaskScheduledFuture.cancel(true);
607 }
608
609 }
610
611 private void unscheduleCollectionIfNeeded()
612 {
613 if (_collectionStartScheduledFuture != null)
614 {
615 _collectionStartScheduledFuture.cancel(false);
616 }
617 if (_collectionStopScheduledFuture != null)
618 {
619 _collectionStopScheduledFuture.cancel(false);
620 }
621 }
622
623 private void processTeardownEvent(final BasherContext basherContext)
624 {
625 unscheduleCollectionIfNeeded();
626
627
628 long nextEventTime = 1 + (basherContext.getTeardownDuration() * 1000);
629 _logger.debug("Scheduling phase transition TEARDOWN -> END at " + nextEventTime);
630 _lastScheduledPhaseTransitionFuture = _scheduledExecutorService.schedule(new EventEmitterTimerTask(_eventManager, new PhaseTransitionEvent(basherContext, TEARDOWN, END, 0)), nextEventTime, TimeUnit.MILLISECONDS);
631 }
632
633 private void processEndEvent()
634 {
635 unscheduleCollectionIfNeeded();
636
637 stopInternal(true);
638 }
639
640 }