View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    * http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package net.sourceforge.basher.impl;
16  
17  import java.io.*;
18  import java.util.*;
19  import java.util.concurrent.ArrayBlockingQueue;
20  import java.util.concurrent.BlockingQueue;
21  import java.util.concurrent.TimeUnit;
22  
23  import net.sourceforge.basher.Average;
24  import net.sourceforge.basher.Task;
25  import net.sourceforge.basher.BasherContext;
26  import net.sourceforge.basher.TaskExecutionContext;
27  
28  /**
29   * @author Johan Lindquist
30   * @version $Revision$
31   */
32  public abstract class AbstractFileCollector extends AbstractCollector
33  {
34      protected final Map<String, OpenFile> _openFiles = new HashMap<String, OpenFile>();
35      protected File _parent;
36      protected String _collectionDirectory;
37  
38      private String _averageHeader = null;
39      private String _averageFooter = null;
40      private String _executionHeader = null;
41      private String _executionFooter = null;
42  
43      private String _averageFilenamePrefix = "averages.";
44      private String _extension;
45  
46      private int _queueCapacity = 200;
47      private BlockingQueue<FileEntry> _fileEntries;
48      private int _numThreads = 5;
49      private List<FileEntryProcessor> _fileEntryProcessors;
50      private long _timeOut = 500;
51      private boolean _keepRunning = true;
52      private boolean _ready = true;
53  
54      public void setNumThreads(final int numThreads)
55      {
56          _numThreads = numThreads;
57      }
58  
59      public void setTimeOut(final long timeOut)
60      {
61          _timeOut = timeOut;
62      }
63  
64      public void setQueueCapacity(final int queueCapacity)
65      {
66          _queueCapacity = queueCapacity;
67      }
68  
69      public void setAverageFooter(final String averageFooter)
70      {
71          _averageFooter = averageFooter;
72      }
73  
74      public void setExecutionFooter(final String executionFooter)
75      {
76          _executionFooter = executionFooter;
77      }
78  
79      public void setAverageHeader(final String averageHeader)
80      {
81          _averageHeader = averageHeader;
82      }
83  
84      public void setExecutionHeader(final String executionHeader)
85      {
86          _executionHeader = executionHeader;
87      }
88  
89      public void setExtension(final String extension)
90      {
91          _extension = extension;
92      }
93  
94      public void setAverageFilenamePrefix(final String averageFilenamePrefix)
95      {
96          _averageFilenamePrefix = averageFilenamePrefix;
97      }
98  
99      /**
100      * Initializes the service.
101      *
102      * @throws Exception If the script directory does not exist or can not be written to
103      */
104     public void initializeService() throws Exception
105     {
106 
107     }
108 
109     /**
110      * Writes the result of a successful task execution to the success file.
111      *
112      * @param taskExecutionContext        The task execution context wrapping the executed task.
113      * @param elapsedTime Time of execution
114      * @param elapsedTimeNanos
115      */
116     public void success( final TaskExecutionContext taskExecutionContext, final long elapsedTime, final long elapsedTimeNanos )
117     {
118         super.success(taskExecutionContext, elapsedTime, elapsedTimeNanos );
119 
120         if (_ready && isCollecting())
121         {
122             writeToFile(Type.SUCCESS, taskExecutionContext.getTaskConfiguration().getTaskName(), elapsedTime, elapsedTimeNanos );
123         }
124     }
125 
126     /**
127      * Writes the result of a failed task to the failure file.
128      *
129      * @param taskExecutionContext         The task execution context wrapping the task that failed.
130      * @param elapsedTime Time of execution
131      * @param elapsedTimeNanos
132      * @param throwable   The reason (if specified) for the failure
133      */
134     public void fail( final TaskExecutionContext taskExecutionContext, final long elapsedTime, final long elapsedTimeNanos, final Throwable throwable )
135     {
136         super.fail(taskExecutionContext, elapsedTime, elapsedTimeNanos, throwable);
137         if (_ready && isCollecting())
138         {
139             writeToFile(Type.FAILURE, taskExecutionContext.getTaskConfiguration().getTaskName(), elapsedTime, elapsedTimeNanos );
140         }
141     }
142 
143     @Override
144     protected void closeOpenResources()
145     {
146         _log.info("Closing open resources");
147         _log.info("Collection queue size: " + _fileEntries.size());
148         if (_fileEntries.size() > 0)
149         {
150             _log.info("Draining collection queue");
151             while (_fileEntries.size() > 0)
152             {
153                 try
154                 {
155                     Thread.sleep(250);
156                 }
157                 catch (InterruptedException e)
158                 {
159                     // Safe to ignore
160                 }
161             }
162         }
163 
164         _keepRunning = false;
165 
166         _log.info("Closing open files");
167         _log.info("Files open: " + _openFiles.size());
168 
169         final Collection<OpenFile> entries = _openFiles.values();
170         try
171         {
172             for (final OpenFile entry : entries)
173             {
174                 try
175                 {
176                     final Writer writer = entry._bufferedWriter;
177                     addFooter(entry._bufferedWriter, entry._classifier);
178                     writer.flush();
179                     writer.close();
180                 }
181                 catch (IOException e)
182                 {
183                     _log.error(e.getMessage(), e);
184                 }
185             }
186         }
187         finally
188         {
189             // Finally, clear out the list of open files
190             _openFiles.clear();
191         }
192 
193 
194         _log.info("Open files closed");
195         
196     }
197 
198     protected void initializeCollector(final BasherContext basherContext) throws Exception
199     {
200         try
201         {
202             _collectionDirectory = basherContext.getReportDirectory();
203 
204             _log.info("Initializing collector");
205             _log.info("Report directory: " + _collectionDirectory);
206 
207             // Important - allows the file entry processors to continue 
208             _keepRunning = true;
209 
210             _parent = new File(_collectionDirectory);
211 
212             if (!_parent.exists())
213             {
214                 if (!_parent.mkdir())
215                 {
216                     throw new Exception("Could not create root collection directory '" + _collectionDirectory + "'");
217                 }
218             }
219             if (!_parent.canWrite())
220             {
221                 throw new Exception("Root collection directory '" + _collectionDirectory + "' can not be written to");
222             }
223 
224             /*// Ok, the directory is available and writable
225             _parent = new File(_parent, "" + _timeSource.getStartTime());
226 
227             if (!_parent.exists())
228             {
229                 if (!_parent.mkdir())
230                 {
231                     throw new Exception("Could not create collection directory '" + _parent.getAbsolutePath() + "'");
232                 }
233             }*/
234 
235             _log.info("Initializing collection queue with capacity: " + _queueCapacity);
236             _fileEntries = new ArrayBlockingQueue<FileEntry>(_queueCapacity);
237 
238             _fileEntryProcessors = new ArrayList<FileEntryProcessor>(_numThreads);
239 
240             for (int i = 0; i < _numThreads; i++)
241             {
242                 final FileEntryProcessor fileEntryProcessor = new FileEntryProcessor();
243                 fileEntryProcessor.setName("FileEntryProcessor_" + i);
244                 fileEntryProcessor.setDaemon(true);
245                 fileEntryProcessor.start();
246                 _fileEntryProcessors.add(fileEntryProcessor);
247             }
248 
249             _log.debug("AbstractFileCollector initialized");
250             _ready = true;
251         }
252         catch (Exception e)
253         {
254             _log.error("Error initializing: " +e.getMessage(),e);
255             _log.error("Will not use filesystem");
256             _ready = false;
257         }
258     }
259 
260     /**
261      * Writes the result of a not tun task execution to the notrun file.
262      *
263      * @param taskExecutionContext        The task that didn't run.
264      * @param elapsedTime Time of execution
265      * @param elapsedTimeNanos
266      */
267     public void notRun( final TaskExecutionContext taskExecutionContext, final long elapsedTime, final long elapsedTimeNanos )
268     {
269         super.notRun(taskExecutionContext, elapsedTime, elapsedTimeNanos );
270         if (_ready &&isCollecting())
271         {
272             writeToFile(Type.NOT_RUN, taskExecutionContext.getTaskConfiguration().getTaskName(), elapsedTime, elapsedTimeNanos );
273         }
274     }
275 
276     /**
277      * Writes the specified <code>Average</code> to the averag file.
278      *
279      * @param average The average to write to disk
280      */
281     protected void dumpAverage(final Average average)
282     {
283         try
284         {
285             final String fileKey = _averageFilenamePrefix + _extension;
286             BufferedWriter bw = getWriter(fileKey, Classifier.AVERAGE);
287             bw.write(formatAverage(average));
288             bw.flush();
289         }
290         catch (IOException e)
291         {
292             _log.error(e.getMessage(), e);
293         }
294 
295     }
296 
297     protected abstract String formatAverage(final Average average);
298 
299     protected abstract String formatExecution( final String taskName, final long elapsedTime, final long elapsedTimeNanos );
300 
301     /**
302      * Writes the time the specified task took to execute.
303      *
304      * @param type
305      * @param taskName    The name of the task (used for the filename as well)
306      * @param elapsedTime Time the task took to execute
307      * @param elapsedTimeNanos
308      */
309     protected void writeToFile( final Type type, final String taskName, final long elapsedTime, final long elapsedTimeNanos )
310     {
311         try
312         {
313             final String fileKey = getFileName(taskName, type);
314             final String formattedText = formatExecution(taskName, elapsedTime, elapsedTimeNanos );
315             _fileEntries.put(new FileEntry(taskName, formattedText, fileKey, Classifier.EXECUTION));
316         }
317         catch (Exception e)
318         {
319             _log.error(e.getMessage(), e);
320         }
321     }
322 
323     private String getFileName(final String taskName, final Type type)
324     {
325         switch (type)
326         {
327             case FAILURE:
328                 return taskName + "-failed." + _extension;
329             case NOT_RUN:
330                 return taskName + "-notrun." + _extension;
331             case SUCCESS:
332                 return taskName + "-success." + _extension;
333             default:
334                 throw new IllegalStateException("Unhandled type: " + type);
335         }
336     }
337 
338     /**
339      * Ensures the average over the last time slice is written to disk.
340      * {@inheritDoc}
341      *
342      * @return The average over the last time slice.
343      */
344     public Average markAverage()
345     {
346         final Average average = super.markAverage();
347         if (_ready && isCollecting())
348         {
349             dumpAverage(average);
350         }
351         return average;
352     }
353 
354 
355     /**
356      * Retrieves (and possibly creates) a new writer on to which to write data
357      *
358      * @param fileKey    The identifier of the buffer
359      * @param classifier
360      * @return The buffer
361      * @throws java.io.IOException If the retrieval (create) fails
362      */
363     protected BufferedWriter getWriter(final String fileKey, final Classifier classifier, Object... headerParams)
364             throws IOException
365     {
366         AbstractFileCollector.OpenFile openFile = _openFiles.get(fileKey);
367         if (openFile == null)
368         {
369             synchronized (_openFiles)
370             {
371                 openFile = _openFiles.get(fileKey);
372                 if (openFile == null)
373                 {
374                     final BufferedWriter bw = new BufferedWriter(new FileWriter(new File(_parent, fileKey)));
375                     openFile = new OpenFile(bw, classifier);
376                     addHeader(bw, classifier, headerParams);
377                     bw.flush();
378                     _openFiles.put(fileKey, openFile);
379                 }
380             }
381         }
382         return openFile._bufferedWriter;
383     }
384 
385     private void addFooter(final BufferedWriter bufferedWriter, final Classifier classifier) throws IOException
386     {
387         switch (classifier)
388         {
389             case AVERAGE:
390                 if (_averageFooter != null)
391                 {
392                     bufferedWriter.write(_averageFooter);
393                 }
394                 break;
395             case EXECUTION:
396                 if (_executionFooter != null)
397                 {
398                     bufferedWriter.write(_executionFooter);
399                 }
400                 break;
401             default:
402                 throw new IllegalStateException("Unhandled classifier: " + classifier);
403         }
404     }
405 
406     private void addHeader(final BufferedWriter bufferedWriter, final Classifier classifier, Object... headerParams) throws IOException
407     {
408         switch (classifier)
409         {
410             case AVERAGE:
411                 if (_averageHeader != null)
412                 {
413                     bufferedWriter.write(String.format(_averageHeader,headerParams));
414                 }
415                 break;
416             case EXECUTION:
417                 if (_executionHeader != null)
418                 {
419                     if (headerParams == null || headerParams.length == 0)
420                     {
421                         bufferedWriter.write(_executionHeader);
422                     }
423                     else
424                     {
425                         bufferedWriter.write(String.format(_executionHeader,headerParams));
426                     }
427                 }
428                 break;
429             default:
430                 throw new IllegalStateException("Unhandled classifier: " + classifier);
431         }
432     }
433 
434     private class OpenFile
435     {
436         private final BufferedWriter _bufferedWriter;
437         private final Classifier _classifier;
438 
439         public OpenFile(final BufferedWriter bufferedWriter, final Classifier classifier)
440         {
441             _bufferedWriter = bufferedWriter;
442             _classifier = classifier;
443         }
444 
445     }
446 
447     public class FileEntry
448     {
449         String _taskName;
450         String _formattedText;
451         String _fileKey;
452         Classifier _classifier;
453 
454         public FileEntry(final String taskName, final String formattedText, final String fileKey, final Classifier classifier)
455         {
456             _taskName = taskName;
457             _formattedText = formattedText;
458             _fileKey = fileKey;
459             _classifier = classifier;
460 
461         }
462     }
463 
464     public static enum Classifier
465     {
466         AVERAGE, EXECUTION
467     }
468 
469     public static enum Type
470     {
471         SUCCESS, FAILURE, NOT_RUN
472     }
473 
474     private class FileEntryProcessor extends Thread
475     {
476         public void run()
477         {
478             while (_keepRunning)
479             {
480                 try
481                 {
482                     // Consume first entry
483                     final AbstractFileCollector.FileEntry fileEntry = _fileEntries.poll(_timeOut, TimeUnit.MILLISECONDS);
484 
485                     if (fileEntry != null)
486                     {
487                         // Write it to the file
488                         final BufferedWriter bw = getWriter(fileEntry._fileKey, fileEntry._classifier,fileEntry._taskName);
489                         bw.write(fileEntry._formattedText);
490                         bw.flush();
491                     }
492                 }
493                 catch (Exception e)
494                 {
495                     _log.error(e.getMessage(), e);
496                 }
497             }
498         }
499     }
500 }