1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package net.sourceforge.basher.internal.impl;
16
17 import net.sourceforge.basher.*;
18 import net.sourceforge.basher.internal.Randomizer;
19 import net.sourceforge.basher.internal.TaskInvoker;
20 import net.sourceforge.basher.internal.TimeSource;
21 import org.apache.commons.logging.Log;
22
23
24
25
26
27 public class TaskInvokerImpl implements TaskInvoker
28 {
29 private Randomizer _randomizer;
30 private Collector _collector;
31 private Log _logger;
32 private TaskManager _taskManager;
33 private TimeSource _timeSource;
34
35 public void setTimeSource(final TimeSource timeSource)
36 {
37 _timeSource = timeSource;
38 }
39
40 public void setLog(final Log log)
41 {
42 _logger = log;
43 }
44
45 public void setRandomizer(final Randomizer randomizer)
46 {
47 _randomizer = randomizer;
48 }
49
50 public void setCollector(final Collector collector)
51 {
52 _collector = collector;
53 }
54
55 public void setTaskManager(final TaskManager taskManager)
56 {
57 _taskManager = taskManager;
58 }
59
60 public void invokeTask(final TaskExecutionContext taskExecutionContext )
61 {
62 doInvokeTask( taskExecutionContext, true);
63 }
64
65 public void invokeTask(final TaskExecutionContext taskExecutionContext, final boolean invokeFollowers)
66 {
67 doInvokeTask(taskExecutionContext, invokeFollowers);
68 }
69
70 private void doInvokeTask(final TaskExecutionContext taskExecutionContext, final boolean invokeFollower)
71 {
72 final Task task = taskExecutionContext.getTask();
73
74 final TaskConfiguration taskConfiguration = taskExecutionContext.getTaskConfiguration();
75
76
77 taskExecutionContext.reserveInvocation();
78
79 try
80 {
81
82 final int invocations = taskExecutionContext.getInvocations() - 1;
83
84 if (taskConfiguration.getMaxInvocations() != 0 && invocations >= taskConfiguration.getMaxInvocations())
85 {
86 if (_logger.isDebugEnabled())
87 {
88 _logger.debug("Not running task: " + taskConfiguration.getTaskName() + " (max invocations reached)");
89 }
90 _collector.notRun(taskExecutionContext, 0, 0 );
91 _taskManager.removeTaskExecutionContext(taskExecutionContext.getIdentifier());
92 return;
93 }
94
95 final long currentTime = _timeSource.getElapsedTime() / 1000;
96
97 if (currentTime < taskConfiguration.getRunFrom() || currentTime > taskConfiguration.getStopAfter())
98 {
99 if (_logger.isDebugEnabled())
100 {
101 _logger.debug(String.format("Not running task: %s (not within time boundary %d/%d/%d)",taskConfiguration.getTaskName(),currentTime,taskConfiguration.getRunFrom(),taskConfiguration.getStopAfter()));
102 }
103 _collector.notRun(taskExecutionContext, 0, 0 );
104 return;
105 }
106
107 final int weigher = _randomizer.getRandomInt(100);
108
109 final int weight = taskConfiguration.getWeight();
110
111 if (weigher < weight)
112 {
113
114 final long startTime = _timeSource.getCurrentTime();
115 final long startTick = _timeSource.getNanoTime();
116 try
117 {
118 if (_logger.isDebugEnabled())
119 {
120 _logger.debug("Invoking task: " + taskConfiguration.getTaskName());
121 }
122 taskExecutionContext.executeTask();
123
124 if (invokeFollower && taskExecutionContext.getFollowers().size() != 0)
125 {
126 if (_logger.isDebugEnabled())
127 {
128 _logger.debug("Invoking followers");
129 }
130
131 for ( final TaskExecutionContext follower : taskExecutionContext.getFollowers() )
132 {
133 invokeTask( follower, false );
134 }
135 }
136
137 _collector.success(taskExecutionContext, _timeSource.getCurrentTime() - startTime, _timeSource.getNanoTime() - startTick);
138 }
139 catch (TaskNotRunException e)
140 {
141
142 if (_logger.isDebugEnabled())
143 {
144 _logger.debug("Task not run: " + taskConfiguration.getTaskName() + " (" + e.getMessage() + ")");
145 }
146 _collector.notRun(taskExecutionContext, _timeSource.getCurrentTime() - startTime, _timeSource.getNanoTime() - startTick);
147 }
148 catch (Throwable throwable)
149 {
150
151 if (_logger.isErrorEnabled())
152 {
153 _logger.error("Task failed: " + taskConfiguration.getTaskName(), throwable);
154 }
155 _collector.fail(taskExecutionContext, _timeSource.getCurrentTime() - startTime, _timeSource.getNanoTime() - startTick, throwable);
156 }
157 }
158 else
159 {
160 if (_logger.isDebugEnabled())
161 {
162 _logger.debug(String.format("Not running task: %s (weight: %d / weigher: %d)", taskConfiguration.getTaskName(), weight, weigher));
163 }
164 _collector.notRun(taskExecutionContext, 0, 0 );
165 }
166 }
167 finally
168 {
169
170 taskExecutionContext.releaseInvocation();
171 }
172 }
173 }