1 | package de.uka.ipd.sdq.scheduler.resources.active.special; |
2 | |
3 | import java.util.ArrayList; |
4 | import java.util.Hashtable; |
5 | import java.util.Random; |
6 | import java.util.Map.Entry; |
7 | |
8 | import de.uka.ipd.sdq.probfunction.math.util.MathTools; |
9 | import de.uka.ipd.sdq.scheduler.IRunningProcess; |
10 | import de.uka.ipd.sdq.scheduler.ISchedulableProcess; |
11 | import de.uka.ipd.sdq.scheduler.LoggingWrapper; |
12 | import de.uka.ipd.sdq.scheduler.SchedulerModel; |
13 | import de.uka.ipd.sdq.scheduler.resources.active.AbstractActiveResource; |
14 | import de.uka.ipd.sdq.scheduler.resources.active.SimResourceInstance; |
15 | import de.uka.ipd.sdq.simulation.abstractsimengine.AbstractSimEventDelegator; |
16 | import de.uka.ipd.sdq.simulation.abstractsimengine.IEntity; |
17 | import de.uka.ipd.sdq.simulation.abstractsimengine.NullEntity; |
18 | |
19 | /** |
20 | * This class is for testing purposes only. It is used for the MASCOTS paper case study. |
21 | * @author hauck |
22 | * |
23 | */ |
24 | public class SimProcessorSharingResourceLinuxO1 extends AbstractActiveResource { |
25 | |
26 | private class DoLoadBalancingEvent extends AbstractSimEventDelegator<NullEntity> { |
27 | |
28 | public DoLoadBalancingEvent(SchedulerModel model) { |
29 | super(model, SimProcessorSharingResourceLinuxO1.class.getName()); |
30 | } |
31 | |
32 | |
33 | @Override |
34 | public void eventRoutine(NullEntity who) { |
35 | //System.out.println(simulator.time() + ": Trying load balancing..."); |
36 | int coreToBalanceTo = getCoreWithShortestQueue(); |
37 | int coreToBalanceFrom = getCoreWithLongestQueue(); |
38 | if ((running_processesPerCore.get(coreToBalanceFrom).size() - running_processesPerCore.get(coreToBalanceTo).size()) > 1) { |
39 | // We have two cores that have a running queue length difference greater than 1. Do load balancing. |
40 | |
41 | // select a random process from the sender core |
42 | Hashtable<ISchedulableProcess, Double> runningProcesses = running_processesPerCore.get(coreToBalanceFrom); |
43 | ISchedulableProcess[] processes = runningProcesses.keySet().toArray(new ISchedulableProcess[]{}); |
44 | |
45 | // move random process from sender core to idle core |
46 | Random random = new Random(); |
47 | ISchedulableProcess processToBalance = processes[random.nextInt(processes.length)]; |
48 | double simTime = getModel().getSimulationControl().getCurrentSimulationTime(); |
49 | System.out.println(simTime + ": Balancing process: " + processToBalance.getId() + " from core " + coreToBalanceFrom + " to " + coreToBalanceTo); |
50 | Double processValue = runningProcesses.get(processToBalance); |
51 | runningProcesses.remove(processToBalance); |
52 | putProcessOnCore(processToBalance, processValue, coreToBalanceTo); |
53 | |
54 | } else { |
55 | // System.out.println(simulator.time() + ": No load balancing needed."); |
56 | } |
57 | |
58 | } |
59 | |
60 | } |
61 | |
62 | private class ProcessingFinishedEvent extends AbstractSimEventDelegator<ISchedulableProcess> { |
63 | |
64 | public ProcessingFinishedEvent(SchedulerModel model) { |
65 | super(model, ProcessingFinishedEvent.class.getName()); |
66 | } |
67 | |
68 | @Override |
69 | public void eventRoutine(ISchedulableProcess process) { |
70 | ISchedulableProcess last = process; |
71 | toNow(); |
72 | // NEW |
73 | int core = getCoreOfARunningProcess(last); |
74 | running_processesPerCore.get(core).remove(last); |
75 | // running_processes.remove(last); |
76 | // TODO: now, we have to check, if we have to perform load |
77 | // balancing. |
78 | // And probably re-calculate times? |
79 | |
80 | // suggestion: |
81 | if (running_processesPerCore.get(core).size() == 0) { |
82 | int coreToBalanceFrom = getCoreWithLongestQueue(); |
83 | if (running_processesPerCore.get(coreToBalanceFrom).size() <= 1) { |
84 | // all cores are idle or have no contention |
85 | } else { |
86 | // Try load balancing one time unit from now |
87 | DoLoadBalancingEvent event = new DoLoadBalancingEvent(SimProcessorSharingResourceLinuxO1.this |
88 | .getModel()); |
89 | double simTime = getModel().getSimulationControl().getCurrentSimulationTime(); |
90 | event.schedule(IEntity.NULL, simTime+1); |
91 | } |
92 | } |
93 | // System.out.println(simulator.time() + ": " + last.getId() + " finished"); |
94 | // LoggingWrapper.log(last + " finished."); |
95 | scheduleNextEvent(); |
96 | last.activate(); |
97 | } |
98 | |
99 | } |
100 | |
101 | private ProcessingFinishedEvent processingFinished = new ProcessingFinishedEvent(null); |
102 | private ArrayList<Hashtable<ISchedulableProcess,Double>> running_processesPerCore = new ArrayList<Hashtable<ISchedulableProcess, Double>>(); |
103 | // private Hashtable<ISchedulableProcess,Double> running_processes = new |
104 | // Hashtable<ISchedulableProcess, Double>(); |
105 | private double last_time; |
106 | private int coreToUseForInitialLoadBalancing = 0; |
107 | |
108 | public SimProcessorSharingResourceLinuxO1(SchedulerModel model, String name, String id, int numberOfCores) { |
109 | super(model, numberOfCores, name, id); |
110 | for (int j=0; j<numberOfCores; j++) { |
111 | running_processesPerCore.add(new Hashtable<ISchedulableProcess, Double>()); |
112 | } |
113 | } |
114 | |
115 | |
116 | |
117 | public void scheduleNextEvent() { |
118 | /** |
119 | * New: look in all queues, i.e. in all nested running_processes |
120 | * hashtables, which process is to be scheduled next. |
121 | */ |
122 | ISchedulableProcess shortest = null; |
123 | Double shortestTime = 0.0; |
124 | for (Hashtable<ISchedulableProcess, Double> running_processes : running_processesPerCore) { |
125 | for (ISchedulableProcess process : running_processes.keySet()) { |
126 | // System.out.println("Time: " + simulator.time() + ", looking for shortest time: " + process.getId() + " time: " + running_processes.get(process) + ", speed: " + getSpeed(process)); |
127 | if (shortest == null || shortestTime > running_processes.get(process) * getSpeed(process)){ |
128 | shortest = process; |
129 | shortestTime = running_processes.get(process) * getSpeed(process); |
130 | // System.out.println("Shortest: " + shortest.getId() + ", shortest time: " + shortestTime); |
131 | } |
132 | } |
133 | } |
134 | |
135 | processingFinished.removeEvent(); |
136 | if (shortest!=null){ |
137 | // New: calculate time for process |
138 | double time = shortestTime;// * getSpeed(shortest); |
139 | // double time = running_processes.get(shortest) * getSpeed(); |
140 | // System.out.println("Time: " + simulator.time() + ", scheduling event at " + time); |
141 | if (!MathTools.less(0, time)) { |
142 | time = 0.0; |
143 | } |
144 | processingFinished.schedule(shortest, time); |
145 | } |
146 | } |
147 | |
148 | private int getCoreOfARunningProcess(ISchedulableProcess process) { |
149 | for (int i=0; i<running_processesPerCore.size(); i++) { |
150 | Hashtable<ISchedulableProcess, Double> running_processes = running_processesPerCore.get(i); |
151 | if (running_processes.containsKey(process)) { |
152 | return i; |
153 | } |
154 | } |
155 | LoggingWrapper.logger.warn("Core of process not found. Returning core 0."); |
156 | return 0; |
157 | } |
158 | |
159 | private int getCoreWithLongestQueue() { |
160 | int coreWithLongestQueue = 0; |
161 | int queueSize = 0; |
162 | for (int i=0; i<running_processesPerCore.size(); i++) { |
163 | if (running_processesPerCore.get(i).size() > queueSize) { |
164 | queueSize = running_processesPerCore.get(i).size(); |
165 | coreWithLongestQueue = i; |
166 | } |
167 | } |
168 | return coreWithLongestQueue; |
169 | } |
170 | |
171 | private int getCoreWithShortestQueue() { |
172 | int coreWithShortestQueue = -1; |
173 | int queueSize = 0; |
174 | for (int i=0; i<running_processesPerCore.size(); i++) { |
175 | if (coreWithShortestQueue == -1) { |
176 | queueSize = running_processesPerCore.get(i).size(); |
177 | coreWithShortestQueue = i; |
178 | } else { |
179 | if (running_processesPerCore.get(i).size() < queueSize) { |
180 | queueSize = running_processesPerCore.get(i).size(); |
181 | coreWithShortestQueue = i; |
182 | } |
183 | } |
184 | } |
185 | return coreWithShortestQueue; |
186 | } |
187 | |
188 | |
189 | private void toNow() { |
190 | double now = getModel().getSimulationControl().getCurrentSimulationTime(); |
191 | double passed_time = now - last_time; |
192 | // System.out.println("toNow: " + now + " - " + last_time + " = " + |
193 | // passed_time); |
194 | if (MathTools.less(0, passed_time)){ |
195 | // passed_time /= getSpeed(); |
196 | // NEW |
197 | for (Hashtable<ISchedulableProcess, Double> running_processes : running_processesPerCore) { |
198 | for (Entry<ISchedulableProcess,Double> e : running_processes.entrySet()) { |
199 | double processPassedTime = passed_time / getSpeed(e.getKey()); |
200 | double rem = e.getValue() - processPassedTime; |
201 | // System.out.println("toNow " + e.getKey().getId() + ": " + |
202 | // e.getValue() + " - " + processPassedTime + " = " + rem); |
203 | e.setValue(rem); |
204 | } |
205 | } |
206 | } |
207 | last_time = now; |
208 | } |
209 | |
210 | @Override |
211 | public double getRemainingDemand(ISchedulableProcess process) { |
212 | boolean hasDemand = false; |
213 | for (Hashtable<ISchedulableProcess, Double> running_processes : running_processesPerCore) { |
214 | if (running_processes.containsKey(process)) { |
215 | hasDemand = true; |
216 | break; |
217 | } |
218 | } |
219 | if (hasDemand == false) { |
220 | return 0.0; |
221 | } |
222 | toNow(); |
223 | for (Hashtable<ISchedulableProcess, Double> running_processes : running_processesPerCore) { |
224 | if (!running_processes.contains(process)) { |
225 | return running_processes.get(process); |
226 | } |
227 | } |
228 | // Should not be reached. |
229 | return 0.0; |
230 | } |
231 | |
232 | @Override |
233 | public void updateDemand(ISchedulableProcess process, double demand) { |
234 | for (Hashtable<ISchedulableProcess, Double> running_processes : running_processesPerCore) { |
235 | for (Entry<ISchedulableProcess,Double> e : running_processes.entrySet()) { |
236 | if (e.getKey().equals(process)) { |
237 | e.setValue(demand); |
238 | break; |
239 | } |
240 | } |
241 | } |
242 | scheduleNextEvent(); |
243 | } |
244 | |
245 | // New: calculate speed for a process. |
246 | private double getSpeed(ISchedulableProcess process) { |
247 | int core = getCoreOfARunningProcess(process); |
248 | double speed = (double)running_processesPerCore.get(core).size(); |
249 | // double speed = (double)running_processes.size() / |
250 | // (double)getCapacity(); |
251 | |
252 | // comparison here is unnecessary, speed cannot be lower than 1. Keep it |
253 | // anyway. |
254 | return speed < 1.0 ? 1.0 : speed; |
255 | } |
256 | |
257 | |
258 | public void start() { |
259 | } |
260 | |
261 | |
262 | @Override |
263 | protected void dequeue(ISchedulableProcess process) { |
264 | } |
265 | |
266 | |
267 | @Override |
268 | protected void doProcessing(ISchedulableProcess process, int resourceServiceID, double demand) { |
269 | toNow(); |
270 | LoggingWrapper.log("PS: " + process + " demands " + demand); |
271 | //System.out.println("PS: " + process.getId() + " demands " + demand); |
272 | |
273 | int coreToPutOn = getLastCoreProcessWasRunningOn(process); |
274 | if (coreToPutOn == -1) { |
275 | // This is a new process which has issued demand for the first time. |
276 | // Same as parent: always use core 0. Check if load balancing has to be done afterwards. |
277 | // If there are tasks running on core 0, use a random core. |
278 | coreToUseForInitialLoadBalancing = 0; |
279 | if (running_processesPerCore.get(coreToUseForInitialLoadBalancing).size() > 0) { |
280 | coreToUseForInitialLoadBalancing = new Random().nextInt(getCapacity()); |
281 | } |
282 | putProcessOnCore(process, demand, coreToUseForInitialLoadBalancing); |
283 | // running_processes.put(process, demand); |
284 | int coreToBalanceFrom = getCoreWithLongestQueue(); |
285 | if (running_processesPerCore.get(coreToBalanceFrom).size() <= 1) { |
286 | // all cores are idle or have no contention |
287 | } else { |
288 | // Try load balancing one time unit from now |
289 | DoLoadBalancingEvent event = new DoLoadBalancingEvent(getModel()); |
290 | double simTime = getModel().getSimulationControl().getCurrentSimulationTime(); |
291 | event.schedule(IEntity.NULL, simTime+1); |
292 | } |
293 | } else { |
294 | putProcessOnCore(process, demand, coreToPutOn); |
295 | // running_processesPerCore.get(coreToPutOn).put(process, demand); |
296 | } |
297 | |
298 | // I don't know if this is right here. |
299 | // I call toNow() again to update all processes (some processes might |
300 | // now share the core with the new process) |
301 | toNow(); |
302 | scheduleNextEvent(); |
303 | process.passivate(); |
304 | } |
305 | |
306 | @Override |
307 | protected void enqueue(ISchedulableProcess process) { |
308 | } |
309 | |
310 | |
311 | public void stop() { |
312 | |
313 | } |
314 | |
315 | public void registerProcess(IRunningProcess runningProcess) { |
316 | } |
317 | |
318 | |
319 | public int getQueueLengthFor(SimResourceInstance simResourceInstance) { |
320 | // TODO where is this needed? Return hard coded queue length of first |
321 | // core. |
322 | return this.running_processesPerCore.get(0).size(); |
323 | // return this.running_processes.size(); |
324 | } |
325 | |
326 | private Hashtable<ISchedulableProcess,Integer> all_processes = new Hashtable<ISchedulableProcess, Integer>(); |
327 | |
328 | /** |
329 | * return -1 if a process was not running before, i.e. is a new process. |
330 | * |
331 | * @param process |
332 | * @return |
333 | */ |
334 | private int getLastCoreProcessWasRunningOn(ISchedulableProcess process) { |
335 | if (all_processes.containsKey(process)) { |
336 | return all_processes.get(process); |
337 | } |
338 | return -1; |
339 | } |
340 | |
341 | private void putProcessOnCore(ISchedulableProcess process, double demand, int core) { |
342 | if (all_processes.containsKey(process)) { |
343 | all_processes.remove(process); |
344 | } |
345 | all_processes.put(process, core); |
346 | //System.out.println(simulator.time() + ": Putting " + process.getId() + " with demand " + demand + " on core " + core); |
347 | running_processesPerCore.get(core).put(process, demand); |
348 | } |
349 | |
350 | } |