1 | package de.uka.ipd.sdq.probespec.framework.concurrency; |
2 | |
3 | import java.util.concurrent.LinkedBlockingQueue; |
4 | |
5 | import org.apache.log4j.Logger; |
6 | |
7 | import de.uka.ipd.sdq.probespec.framework.ProbeSetAndRequestContext; |
8 | import de.uka.ipd.sdq.probespec.framework.ProbeSetSample; |
9 | import de.uka.ipd.sdq.probespec.framework.ProbeSpecContext; |
10 | import de.uka.ipd.sdq.probespec.framework.RequestContext; |
11 | import de.uka.ipd.sdq.probespec.framework.SampleBlackboard; |
12 | |
13 | /** |
14 | * Extends the sample blackboard by concurrency. By using this blackboard the |
15 | * thread adding samples to the blackboard gets decoupled from the processing of |
16 | * that samples within the blackboard. This is reached by means of a queue. When |
17 | * adding a sample it is enqueued and the adding thread can immediately proceed |
18 | * without waiting for the sample to be processed. The processing of the sample |
19 | * is conducted within a (single) other thread, whose only purpose is to process |
20 | * these samples. |
21 | * |
22 | * @author Philipp Merkle |
23 | * |
24 | */ |
25 | public class ConcurrentSampleBlackboard extends SampleBlackboard { |
26 | |
27 | private static Logger logger = Logger.getLogger(ConcurrentSampleBlackboard.class.getName()); |
28 | |
29 | private LinkedBlockingQueue<QueuedAction> sampleQueue; |
30 | private ThreadManager threadManager; |
31 | private boolean running; |
32 | |
33 | public ConcurrentSampleBlackboard(ThreadManager threadManager) { |
34 | this.sampleQueue = new LinkedBlockingQueue<QueuedAction>(); |
35 | this.threadManager = threadManager; |
36 | } |
37 | |
38 | private void delegateAddSample(ProbeSetSample pss) { |
39 | super.addSample(pss); |
40 | } |
41 | |
42 | private void delegateDeleteSamplesInRequestContext( |
43 | RequestContext requestContext) { |
44 | super.deleteSamplesInRequestContext(requestContext); |
45 | } |
46 | |
47 | private void delegateDeleteSample(ProbeSetAndRequestContext pss) { |
48 | super.deleteSample(pss); |
49 | } |
50 | |
51 | /** |
52 | * Adds a sample to the blackboard. The sample is enqueued as long as the |
53 | * queue does not overrun. Then the calling thread can proceed immediately. |
54 | * However when the queue overruns, the calling thread gets blocked until |
55 | * the queue is ready to carry the sample. |
56 | * <p> |
57 | * Notice: The sample will be not available at the blackboard until it is |
58 | * processed. |
59 | * |
60 | * @param pss |
61 | * the sample |
62 | */ |
63 | @Override |
64 | public void addSample(ProbeSetSample pss) { |
65 | checkRunning(); |
66 | try { |
67 | sampleQueue.put(new AddSampleAction(pss)); |
68 | } catch (InterruptedException e) { |
69 | // TODO Auto-generated catch block |
70 | e.printStackTrace(); |
71 | } |
72 | } |
73 | |
74 | /** |
75 | * Deletes all samples taken within the specified {@link RequestContext}. |
76 | * The deletion request is enqueued as long as the queue does not overrun. |
77 | * Then the calling thread can proceed immediately. However when the queue |
78 | * overruns, the calling thread gets blocked until the queue is ready to |
79 | * carry the request. |
80 | * |
81 | * @param requestContext |
82 | * the RequestContext whose samples are to be deleted |
83 | */ |
84 | @Override |
85 | public void deleteSamplesInRequestContext(RequestContext requestContext) { |
86 | checkRunning(); |
87 | try { |
88 | sampleQueue.put(new DeleteSamplesInRequestContextAction( |
89 | requestContext)); |
90 | } catch (InterruptedException e) { |
91 | // TODO Auto-generated catch block |
92 | e.printStackTrace(); |
93 | } |
94 | } |
95 | |
96 | /** |
97 | * Deletes the specified sample. The deletion request is enqueued as long as |
98 | * the queue does not overrun. Then the calling thread can proceed |
99 | * immediately. However when the queue overruns, the calling thread gets |
100 | * blocked until the queue is ready to carry the request. |
101 | * |
102 | * @param requestContext |
103 | * the samples |
104 | */ |
105 | @Override |
106 | public void deleteSample(ProbeSetAndRequestContext pss) { |
107 | checkRunning(); |
108 | try { |
109 | sampleQueue.put(new DeleteSampleAction(pss)); |
110 | } catch (InterruptedException e) { |
111 | // TODO Auto-generated catch block |
112 | e.printStackTrace(); |
113 | } |
114 | } |
115 | |
116 | private synchronized void checkRunning() { |
117 | if (!running) { |
118 | running = true; |
119 | threadManager.startThread(new ProcessQueuedActions(), "ProbeSpec Concurrent Blackboard"); |
120 | } |
121 | } |
122 | |
123 | /** |
124 | * This runnable will be used for the thread processing the samples and |
125 | * requests in the queue. |
126 | */ |
127 | private class ProcessQueuedActions implements StoppableRunnable { |
128 | |
129 | private boolean keepRunning = true; |
130 | |
131 | @Override |
132 | public void run() { |
133 | while (keepRunning || !sampleQueue.isEmpty()) { |
134 | try { |
135 | sampleQueue.take().execute(); |
136 | } catch (InterruptedException e) { |
137 | // TODO Auto-generated catch block |
138 | e.printStackTrace(); |
139 | } |
140 | } |
141 | logger.debug("Runnable " + this.getClass().getSimpleName() |
142 | + " stopped running"); |
143 | } |
144 | |
145 | @Override |
146 | public void stop() { |
147 | keepRunning = false; |
148 | if (sampleQueue.isEmpty()) { |
149 | try { |
150 | sampleQueue.put(new ShutdownQueueAction()); |
151 | } catch (InterruptedException e) { |
152 | // TODO Auto-generated catch block |
153 | e.printStackTrace(); |
154 | } |
155 | } |
156 | } |
157 | |
158 | } |
159 | |
160 | /** |
161 | * A queued action encapsulates some actions that can be added to a queue. |
162 | */ |
163 | private interface QueuedAction { |
164 | |
165 | public void execute(); |
166 | |
167 | } |
168 | |
169 | /** |
170 | * This action does nothing and can be used to shutdown the thread |
171 | * processing queue entries. |
172 | */ |
173 | private class ShutdownQueueAction implements QueuedAction { |
174 | |
175 | @Override |
176 | public void execute() { |
177 | // Nothing to do |
178 | } |
179 | |
180 | } |
181 | |
182 | /** |
183 | * Adds a sample to the blackboard. |
184 | */ |
185 | private class AddSampleAction implements QueuedAction { |
186 | |
187 | private ProbeSetSample pss; |
188 | |
189 | public AddSampleAction(ProbeSetSample pss) { |
190 | this.pss = pss; |
191 | } |
192 | |
193 | @Override |
194 | public void execute() { |
195 | delegateAddSample(pss); |
196 | } |
197 | |
198 | } |
199 | |
200 | private class DeleteSamplesInRequestContextAction implements QueuedAction { |
201 | |
202 | private RequestContext requestContext; |
203 | |
204 | public DeleteSamplesInRequestContextAction(RequestContext requestContext) { |
205 | this.requestContext = requestContext; |
206 | } |
207 | |
208 | @Override |
209 | public void execute() { |
210 | delegateDeleteSamplesInRequestContext(requestContext); |
211 | } |
212 | |
213 | } |
214 | |
215 | private class DeleteSampleAction implements QueuedAction { |
216 | |
217 | private ProbeSetAndRequestContext pss; |
218 | |
219 | public DeleteSampleAction(ProbeSetAndRequestContext pss) { |
220 | this.pss = pss; |
221 | } |
222 | |
223 | @Override |
224 | public void execute() { |
225 | delegateDeleteSample(pss); |
226 | } |
227 | |
228 | } |
229 | |
230 | } |