1 | package de.uka.ipd.sdq.simucomframework.calculator; |
2 | |
3 | import java.util.Vector; |
4 | import java.util.concurrent.LinkedBlockingQueue; |
5 | |
6 | import javax.measure.Measure; |
7 | import javax.measure.quantity.Quantity; |
8 | |
9 | import org.apache.log4j.Logger; |
10 | import org.eclipse.core.runtime.CoreException; |
11 | |
12 | import de.uka.ipd.sdq.pipesandfilters.framework.MetaDataInit; |
13 | import de.uka.ipd.sdq.pipesandfilters.framework.PipeData; |
14 | import de.uka.ipd.sdq.pipesandfilters.framework.PipesAndFiltersManager; |
15 | import de.uka.ipd.sdq.pipesandfilters.framework.recorder.IRawWriteStrategy; |
16 | import de.uka.ipd.sdq.pipesandfilters.framework.recorder.RawRecorder; |
17 | import de.uka.ipd.sdq.pipesandfilters.framework.recorder.Recorder; |
18 | import de.uka.ipd.sdq.pipesandfilters.framework.recorder.launch.RecorderExtensionHelper; |
19 | import de.uka.ipd.sdq.probespec.framework.calculator.Calculator; |
20 | import de.uka.ipd.sdq.probespec.framework.calculator.ICalculatorListener; |
21 | import de.uka.ipd.sdq.simucomframework.model.SimuComModel; |
22 | |
23 | public class SetupConcurrentPipesAndFiltersStrategy implements ISetupDataSinkStrategy { |
24 | /** Logger for this class. */ |
25 | private static final Logger logger = Logger.getLogger(SetupConcurrentPipesAndFiltersStrategy.class); |
26 | |
27 | private SimuComModel model; |
28 | |
29 | public SetupConcurrentPipesAndFiltersStrategy(SimuComModel model) { |
30 | this.model = model; |
31 | } |
32 | |
33 | public PipesAndFiltersManager setupDataSink(Calculator calculator, MetaDataInit metaData) { |
34 | // Initialize recorder and Pipes-and-Filters-Manger |
35 | Recorder recorder = new RawRecorder(createWriteStrategy()); |
36 | final PipesAndFiltersManager pipeManager = new PipesAndFiltersManager( |
37 | recorder); |
38 | pipeManager.initialize(metaData); |
39 | |
40 | // start thread |
41 | final ProcessPipeData processPipeData = new ProcessPipeData(pipeManager); |
42 | new Thread(processPipeData).start(); |
43 | |
44 | calculator.addCalculatorListener(new ICalculatorListener() { |
45 | public void calculated(Vector<Measure<?, ? extends Quantity>> resultTuple) { |
46 | processPipeData.enqueue(new PipeData(resultTuple)); |
47 | } |
48 | }); |
49 | |
50 | return pipeManager; |
51 | } |
52 | |
53 | private IRawWriteStrategy createWriteStrategy() { |
54 | try { |
55 | String writeStrategyClass = RecorderExtensionHelper |
56 | .getWriteStrategyClassNameForName(model.getConfig() |
57 | .getRecorderName()); |
58 | return (IRawWriteStrategy) Class.forName(writeStrategyClass).newInstance(); |
59 | } catch (CoreException e) { |
60 | logger.error("Error occured during write strategy creation.", e); |
61 | } catch (InstantiationException e) { |
62 | logger.error("Error occured during write strategy creation.", e); |
63 | } catch (IllegalAccessException e) { |
64 | logger.error("Error occured during write strategy creation.", e); |
65 | } catch (ClassNotFoundException e) { |
66 | logger.error("Error occured during write strategy creation.", e); |
67 | } |
68 | return null; |
69 | } |
70 | |
71 | private class ProcessPipeData implements Runnable { |
72 | |
73 | private LinkedBlockingQueue<PipeData> pipeQueue; |
74 | |
75 | private boolean keepRunning = true; |
76 | |
77 | private PipesAndFiltersManager pipeManager; |
78 | |
79 | public ProcessPipeData(PipesAndFiltersManager pipeManager) { |
80 | this.pipeManager = pipeManager; |
81 | pipeQueue = new LinkedBlockingQueue<PipeData>(); |
82 | } |
83 | |
84 | public void run() { |
85 | while(keepRunning) { |
86 | try { |
87 | pipeManager.processData(pipeQueue.take()); |
88 | } catch (InterruptedException e) { |
89 | logger.error("Could not process data in the pipe.", e); |
90 | } |
91 | } |
92 | } |
93 | |
94 | public void stop() { |
95 | keepRunning = false; |
96 | } |
97 | |
98 | public void enqueue(PipeData data) { |
99 | try { |
100 | pipeQueue.put(data); |
101 | } catch (InterruptedException e) { |
102 | logger.error("Could not enqueue data in pipe.", e); |
103 | } |
104 | } |
105 | |
106 | } |
107 | } |