1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package net.sourceforge.basher.internal.impl;
16
17 import java.util.HashSet;
18 import java.util.Set;
19 import java.util.concurrent.Semaphore;
20 import java.util.concurrent.atomic.AtomicBoolean;
21
22 import net.sourceforge.basher.Basher;
23 import net.sourceforge.basher.TaskExecutionContext;
24 import net.sourceforge.basher.TaskManager;
25 import net.sourceforge.basher.events.*;
26 import net.sourceforge.basher.internal.Randomizer;
27 import net.sourceforge.basher.internal.TaskInvoker;
28 import net.sourceforge.basher.internal.TaskRunner;
29 import org.apache.commons.logging.Log;
30 import org.ops4j.gaderian.events.RegistryShutdownListener;
31
32
33
34
35
36 public class TaskRunnerImpl implements TaskRunner, BasherEventListener, RegistryShutdownListener
37 {
38 private Randomizer _randomizer;
39 private TaskManager _taskManager;
40 private TaskInvoker _taskInvoker;
41 private Log _logger;
42 private int _minTime;
43 private int _maxDelay;
44
45 private final Set<String> _threadsShutdown = new HashSet<String>();
46
47 private final Semaphore _semaphore = new Semaphore(0);
48
49 private final AtomicBoolean _keepRunning = new AtomicBoolean(true);
50 public int numberOfThreadsRunning = 0;
51
52 public void setRandomizer(final Randomizer randomizer)
53 {
54 _randomizer = randomizer;
55 }
56
57 public void setLog(final Log logger)
58 {
59 _logger = logger;
60 }
61
62 public void setTaskManager(final TaskManager taskManager)
63 {
64 _taskManager = taskManager;
65 }
66
67 public void setTaskInvoker(final TaskInvoker taskInvoker)
68 {
69 _taskInvoker = taskInvoker;
70 }
71
72
73 public void stopInvoking()
74 {
75
76 _keepRunning.set(false);
77 }
78
79
80
81
82 public void run()
83 {
84
85 while (keepProcessing())
86 {
87 try
88 {
89
90 final TaskExecutionContext taskExecutionContext = _taskManager.getNextTaskExecutionContext();
91
92
93 if ( taskExecutionContext != null)
94 {
95 _taskInvoker.invokeTask( taskExecutionContext );
96 }
97
98
99 if (_keepRunning.get())
100 {
101 Basher.fireCleanUpThread();
102 Thread.sleep(getRandomSleepTime());
103 }
104 }
105 catch (final Throwable e)
106 {
107 _logger.error(e.getMessage(), e);
108 }
109 }
110 }
111
112 private boolean keepProcessing()
113 {
114 try
115 {
116 _semaphore.acquire();
117 try
118 {
119
120 final String threadName = Thread.currentThread().getName();
121 if (_threadsShutdown.contains(threadName))
122 {
123 _threadsShutdown.remove(threadName);
124
125 _semaphore.acquire();
126 return false;
127 }
128 return _keepRunning.get();
129 }
130 finally
131 {
132 _semaphore.release();
133 }
134
135 }
136 catch (final InterruptedException e)
137 {
138
139 }
140 return false;
141 }
142
143 private long getRandomSleepTime()
144 {
145
146 return _minTime + _randomizer.getRandomInt(_maxDelay);
147 }
148
149 public void basherEvent(final BasherEvent basherEvent)
150 {
151 if (basherEvent instanceof PhaseTransitionEvent)
152 {
153
154 final PhaseTransitionEvent phaseTransitionEvent = (PhaseTransitionEvent) basherEvent;
155 _maxDelay = phaseTransitionEvent.getBasherContext().getTaskMaxDelay();
156 _minTime = phaseTransitionEvent.getBasherContext().getTaskMinDelay();
157
158 switch (phaseTransitionEvent.getNewPhase())
159 {
160 case START:
161 numberOfThreadsRunning = phaseTransitionEvent.getBasherContext().getInitialNumberThreads();
162 break;
163 case SETUP:
164 break;
165 case RUN:
166
167 _logger.info("Number of available permits: " + _semaphore.availablePermits());
168 _logger.info("Number of threads: " + numberOfThreadsRunning);
169 _semaphore.release(numberOfThreadsRunning);
170 break;
171 case COOLDOWN:
172 case TEARDOWN:
173 try
174 {
175 _semaphore.acquire(_semaphore.availablePermits());
176 }
177 catch (final InterruptedException e)
178 {
179 e.printStackTrace();
180 }
181 break;
182 case END:
183 flipProcessing(false);
184 default:
185 }
186 }
187 else if (basherEvent instanceof ThreadAddedEvent)
188 {
189 numberOfThreadsRunning = ((ThreadAddedEvent)basherEvent).getCurrentNumberThreads();
190 _semaphore.release(1);
191 }
192 else if (basherEvent instanceof ThreadRemovedEvent)
193 {
194 final ThreadRemovedEvent threadRemovedEvent = (ThreadRemovedEvent) basherEvent;
195 numberOfThreadsRunning = threadRemovedEvent.getCurrentNumberThreads();
196 final String threadName = threadRemovedEvent.getName();
197
198 if (!_threadsShutdown.contains(threadName))
199 {
200 _logger.info("Thread " + threadName + " shutting down on request");
201 _threadsShutdown.add(threadName);
202 }
203 }
204 else if (basherEvent instanceof NoTasksAvailableEvent)
205 {
206 flipProcessing(false);
207 }
208 else if (basherEvent instanceof TasksAvailableEvent)
209 {
210
211 flipProcessing(true);
212 }
213 }
214
215 private void flipProcessing(final boolean state)
216 {
217 if (state)
218 {
219 _semaphore.release(numberOfThreadsRunning);
220 }
221 else
222 {
223 if (_semaphore.availablePermits() == 0)
224 {
225
226 }
227 else
228 {
229 try
230 {
231 _semaphore.acquire(_semaphore.availablePermits());
232 }
233 catch (final InterruptedException e)
234 {
235 e.printStackTrace();
236 }
237 }
238 }
239 }
240
241 public void registryDidShutdown()
242 {
243 stopInvoking();
244 }
245 }