1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package net.sourceforge.basher.impl;
16
17 import java.io.*;
18 import java.util.*;
19 import java.util.concurrent.ArrayBlockingQueue;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.TimeUnit;
22
23 import net.sourceforge.basher.Average;
24 import net.sourceforge.basher.Task;
25 import net.sourceforge.basher.BasherContext;
26 import net.sourceforge.basher.TaskExecutionContext;
27
28
29
30
31
32 public abstract class AbstractFileCollector extends AbstractCollector
33 {
34 protected final Map<String, OpenFile> _openFiles = new HashMap<String, OpenFile>();
35 protected File _parent;
36 protected String _collectionDirectory;
37
38 private String _averageHeader = null;
39 private String _averageFooter = null;
40 private String _executionHeader = null;
41 private String _executionFooter = null;
42
43 private String _averageFilenamePrefix = "averages.";
44 private String _extension;
45
46 private int _queueCapacity = 200;
47 private BlockingQueue<FileEntry> _fileEntries;
48 private int _numThreads = 5;
49 private List<FileEntryProcessor> _fileEntryProcessors;
50 private long _timeOut = 500;
51 private boolean _keepRunning = true;
52 private boolean _ready = true;
53
54 public void setNumThreads(final int numThreads)
55 {
56 _numThreads = numThreads;
57 }
58
59 public void setTimeOut(final long timeOut)
60 {
61 _timeOut = timeOut;
62 }
63
64 public void setQueueCapacity(final int queueCapacity)
65 {
66 _queueCapacity = queueCapacity;
67 }
68
69 public void setAverageFooter(final String averageFooter)
70 {
71 _averageFooter = averageFooter;
72 }
73
74 public void setExecutionFooter(final String executionFooter)
75 {
76 _executionFooter = executionFooter;
77 }
78
79 public void setAverageHeader(final String averageHeader)
80 {
81 _averageHeader = averageHeader;
82 }
83
84 public void setExecutionHeader(final String executionHeader)
85 {
86 _executionHeader = executionHeader;
87 }
88
89 public void setExtension(final String extension)
90 {
91 _extension = extension;
92 }
93
94 public void setAverageFilenamePrefix(final String averageFilenamePrefix)
95 {
96 _averageFilenamePrefix = averageFilenamePrefix;
97 }
98
99
100
101
102
103
104 public void initializeService() throws Exception
105 {
106
107 }
108
109
110
111
112
113
114
115
116 public void success( final TaskExecutionContext taskExecutionContext, final long elapsedTime, final long elapsedTimeNanos )
117 {
118 super.success(taskExecutionContext, elapsedTime, elapsedTimeNanos );
119
120 if (_ready && isCollecting())
121 {
122 writeToFile(Type.SUCCESS, taskExecutionContext.getTaskConfiguration().getTaskName(), elapsedTime, elapsedTimeNanos );
123 }
124 }
125
126
127
128
129
130
131
132
133
134 public void fail( final TaskExecutionContext taskExecutionContext, final long elapsedTime, final long elapsedTimeNanos, final Throwable throwable )
135 {
136 super.fail(taskExecutionContext, elapsedTime, elapsedTimeNanos, throwable);
137 if (_ready && isCollecting())
138 {
139 writeToFile(Type.FAILURE, taskExecutionContext.getTaskConfiguration().getTaskName(), elapsedTime, elapsedTimeNanos );
140 }
141 }
142
143 @Override
144 protected void closeOpenResources()
145 {
146 _log.info("Closing open resources");
147 _log.info("Collection queue size: " + _fileEntries.size());
148 if (_fileEntries.size() > 0)
149 {
150 _log.info("Draining collection queue");
151 while (_fileEntries.size() > 0)
152 {
153 try
154 {
155 Thread.sleep(250);
156 }
157 catch (InterruptedException e)
158 {
159
160 }
161 }
162 }
163
164 _keepRunning = false;
165
166 _log.info("Closing open files");
167 _log.info("Files open: " + _openFiles.size());
168
169 final Collection<OpenFile> entries = _openFiles.values();
170 try
171 {
172 for (final OpenFile entry : entries)
173 {
174 try
175 {
176 final Writer writer = entry._bufferedWriter;
177 addFooter(entry._bufferedWriter, entry._classifier);
178 writer.flush();
179 writer.close();
180 }
181 catch (IOException e)
182 {
183 _log.error(e.getMessage(), e);
184 }
185 }
186 }
187 finally
188 {
189
190 _openFiles.clear();
191 }
192
193
194 _log.info("Open files closed");
195
196 }
197
198 protected void initializeCollector(final BasherContext basherContext) throws Exception
199 {
200 try
201 {
202 _collectionDirectory = basherContext.getReportDirectory();
203
204 _log.info("Initializing collector");
205 _log.info("Report directory: " + _collectionDirectory);
206
207
208 _keepRunning = true;
209
210 _parent = new File(_collectionDirectory);
211
212 if (!_parent.exists())
213 {
214 if (!_parent.mkdir())
215 {
216 throw new Exception("Could not create root collection directory '" + _collectionDirectory + "'");
217 }
218 }
219 if (!_parent.canWrite())
220 {
221 throw new Exception("Root collection directory '" + _collectionDirectory + "' can not be written to");
222 }
223
224
225
226
227
228
229
230
231
232
233
234
235 _log.info("Initializing collection queue with capacity: " + _queueCapacity);
236 _fileEntries = new ArrayBlockingQueue<FileEntry>(_queueCapacity);
237
238 _fileEntryProcessors = new ArrayList<FileEntryProcessor>(_numThreads);
239
240 for (int i = 0; i < _numThreads; i++)
241 {
242 final FileEntryProcessor fileEntryProcessor = new FileEntryProcessor();
243 fileEntryProcessor.setName("FileEntryProcessor_" + i);
244 fileEntryProcessor.setDaemon(true);
245 fileEntryProcessor.start();
246 _fileEntryProcessors.add(fileEntryProcessor);
247 }
248
249 _log.debug("AbstractFileCollector initialized");
250 _ready = true;
251 }
252 catch (Exception e)
253 {
254 _log.error("Error initializing: " +e.getMessage(),e);
255 _log.error("Will not use filesystem");
256 _ready = false;
257 }
258 }
259
260
261
262
263
264
265
266
267 public void notRun( final TaskExecutionContext taskExecutionContext, final long elapsedTime, final long elapsedTimeNanos )
268 {
269 super.notRun(taskExecutionContext, elapsedTime, elapsedTimeNanos );
270 if (_ready &&isCollecting())
271 {
272 writeToFile(Type.NOT_RUN, taskExecutionContext.getTaskConfiguration().getTaskName(), elapsedTime, elapsedTimeNanos );
273 }
274 }
275
276
277
278
279
280
281 protected void dumpAverage(final Average average)
282 {
283 try
284 {
285 final String fileKey = _averageFilenamePrefix + _extension;
286 BufferedWriter bw = getWriter(fileKey, Classifier.AVERAGE);
287 bw.write(formatAverage(average));
288 bw.flush();
289 }
290 catch (IOException e)
291 {
292 _log.error(e.getMessage(), e);
293 }
294
295 }
296
297 protected abstract String formatAverage(final Average average);
298
299 protected abstract String formatExecution( final String taskName, final long elapsedTime, final long elapsedTimeNanos );
300
301
302
303
304
305
306
307
308
309 protected void writeToFile( final Type type, final String taskName, final long elapsedTime, final long elapsedTimeNanos )
310 {
311 try
312 {
313 final String fileKey = getFileName(taskName, type);
314 final String formattedText = formatExecution(taskName, elapsedTime, elapsedTimeNanos );
315 _fileEntries.put(new FileEntry(taskName, formattedText, fileKey, Classifier.EXECUTION));
316 }
317 catch (Exception e)
318 {
319 _log.error(e.getMessage(), e);
320 }
321 }
322
323 private String getFileName(final String taskName, final Type type)
324 {
325 switch (type)
326 {
327 case FAILURE:
328 return taskName + "-failed." + _extension;
329 case NOT_RUN:
330 return taskName + "-notrun." + _extension;
331 case SUCCESS:
332 return taskName + "-success." + _extension;
333 default:
334 throw new IllegalStateException("Unhandled type: " + type);
335 }
336 }
337
338
339
340
341
342
343
344 public Average markAverage()
345 {
346 final Average average = super.markAverage();
347 if (_ready && isCollecting())
348 {
349 dumpAverage(average);
350 }
351 return average;
352 }
353
354
355
356
357
358
359
360
361
362
363 protected BufferedWriter getWriter(final String fileKey, final Classifier classifier, Object... headerParams)
364 throws IOException
365 {
366 AbstractFileCollector.OpenFile openFile = _openFiles.get(fileKey);
367 if (openFile == null)
368 {
369 synchronized (_openFiles)
370 {
371 openFile = _openFiles.get(fileKey);
372 if (openFile == null)
373 {
374 final BufferedWriter bw = new BufferedWriter(new FileWriter(new File(_parent, fileKey)));
375 openFile = new OpenFile(bw, classifier);
376 addHeader(bw, classifier, headerParams);
377 bw.flush();
378 _openFiles.put(fileKey, openFile);
379 }
380 }
381 }
382 return openFile._bufferedWriter;
383 }
384
385 private void addFooter(final BufferedWriter bufferedWriter, final Classifier classifier) throws IOException
386 {
387 switch (classifier)
388 {
389 case AVERAGE:
390 if (_averageFooter != null)
391 {
392 bufferedWriter.write(_averageFooter);
393 }
394 break;
395 case EXECUTION:
396 if (_executionFooter != null)
397 {
398 bufferedWriter.write(_executionFooter);
399 }
400 break;
401 default:
402 throw new IllegalStateException("Unhandled classifier: " + classifier);
403 }
404 }
405
406 private void addHeader(final BufferedWriter bufferedWriter, final Classifier classifier, Object... headerParams) throws IOException
407 {
408 switch (classifier)
409 {
410 case AVERAGE:
411 if (_averageHeader != null)
412 {
413 bufferedWriter.write(String.format(_averageHeader,headerParams));
414 }
415 break;
416 case EXECUTION:
417 if (_executionHeader != null)
418 {
419 if (headerParams == null || headerParams.length == 0)
420 {
421 bufferedWriter.write(_executionHeader);
422 }
423 else
424 {
425 bufferedWriter.write(String.format(_executionHeader,headerParams));
426 }
427 }
428 break;
429 default:
430 throw new IllegalStateException("Unhandled classifier: " + classifier);
431 }
432 }
433
434 private class OpenFile
435 {
436 private final BufferedWriter _bufferedWriter;
437 private final Classifier _classifier;
438
439 public OpenFile(final BufferedWriter bufferedWriter, final Classifier classifier)
440 {
441 _bufferedWriter = bufferedWriter;
442 _classifier = classifier;
443 }
444
445 }
446
447 public class FileEntry
448 {
449 String _taskName;
450 String _formattedText;
451 String _fileKey;
452 Classifier _classifier;
453
454 public FileEntry(final String taskName, final String formattedText, final String fileKey, final Classifier classifier)
455 {
456 _taskName = taskName;
457 _formattedText = formattedText;
458 _fileKey = fileKey;
459 _classifier = classifier;
460
461 }
462 }
463
464 public static enum Classifier
465 {
466 AVERAGE, EXECUTION
467 }
468
469 public static enum Type
470 {
471 SUCCESS, FAILURE, NOT_RUN
472 }
473
474 private class FileEntryProcessor extends Thread
475 {
476 public void run()
477 {
478 while (_keepRunning)
479 {
480 try
481 {
482
483 final AbstractFileCollector.FileEntry fileEntry = _fileEntries.poll(_timeOut, TimeUnit.MILLISECONDS);
484
485 if (fileEntry != null)
486 {
487
488 final BufferedWriter bw = getWriter(fileEntry._fileKey, fileEntry._classifier,fileEntry._taskName);
489 bw.write(fileEntry._formattedText);
490 bw.flush();
491 }
492 }
493 catch (Exception e)
494 {
495 _log.error(e.getMessage(), e);
496 }
497 }
498 }
499 }
500 }