| 1 | package de.uka.ipd.sdq.scheduler.resources.active.special; |
| 2 | |
| 3 | import java.util.ArrayList; |
| 4 | import java.util.Hashtable; |
| 5 | import java.util.Random; |
| 6 | import java.util.Map.Entry; |
| 7 | |
| 8 | import de.uka.ipd.sdq.probfunction.math.util.MathTools; |
| 9 | import de.uka.ipd.sdq.scheduler.IRunningProcess; |
| 10 | import de.uka.ipd.sdq.scheduler.ISchedulableProcess; |
| 11 | import de.uka.ipd.sdq.scheduler.LoggingWrapper; |
| 12 | import de.uka.ipd.sdq.scheduler.SchedulerModel; |
| 13 | import de.uka.ipd.sdq.scheduler.resources.active.AbstractActiveResource; |
| 14 | import de.uka.ipd.sdq.scheduler.resources.active.SimResourceInstance; |
| 15 | import de.uka.ipd.sdq.simulation.abstractsimengine.AbstractSimEventDelegator; |
| 16 | import de.uka.ipd.sdq.simulation.abstractsimengine.IEntity; |
| 17 | import de.uka.ipd.sdq.simulation.abstractsimengine.NullEntity; |
| 18 | |
| 19 | /** |
| 20 | * This class is for testing purposes only. It is used for the MASCOTS paper case study. |
| 21 | * @author hauck |
| 22 | * |
| 23 | */ |
| 24 | public class SimProcessorSharingResourceLinuxO1 extends AbstractActiveResource { |
| 25 | |
| 26 | private class DoLoadBalancingEvent extends AbstractSimEventDelegator<NullEntity> { |
| 27 | |
| 28 | public DoLoadBalancingEvent(SchedulerModel model) { |
| 29 | super(model, SimProcessorSharingResourceLinuxO1.class.getName()); |
| 30 | } |
| 31 | |
| 32 | |
| 33 | @Override |
| 34 | public void eventRoutine(NullEntity who) { |
| 35 | //System.out.println(simulator.time() + ": Trying load balancing..."); |
| 36 | int coreToBalanceTo = getCoreWithShortestQueue(); |
| 37 | int coreToBalanceFrom = getCoreWithLongestQueue(); |
| 38 | if ((running_processesPerCore.get(coreToBalanceFrom).size() - running_processesPerCore.get(coreToBalanceTo).size()) > 1) { |
| 39 | // We have two cores that have a running queue length difference greater than 1. Do load balancing. |
| 40 | |
| 41 | // select a random process from the sender core |
| 42 | Hashtable<ISchedulableProcess, Double> runningProcesses = running_processesPerCore.get(coreToBalanceFrom); |
| 43 | ISchedulableProcess[] processes = runningProcesses.keySet().toArray(new ISchedulableProcess[]{}); |
| 44 | |
| 45 | // move random process from sender core to idle core |
| 46 | Random random = new Random(); |
| 47 | ISchedulableProcess processToBalance = processes[random.nextInt(processes.length)]; |
| 48 | double simTime = getModel().getSimulationControl().getCurrentSimulationTime(); |
| 49 | System.out.println(simTime + ": Balancing process: " + processToBalance.getId() + " from core " + coreToBalanceFrom + " to " + coreToBalanceTo); |
| 50 | Double processValue = runningProcesses.get(processToBalance); |
| 51 | runningProcesses.remove(processToBalance); |
| 52 | putProcessOnCore(processToBalance, processValue, coreToBalanceTo); |
| 53 | |
| 54 | } else { |
| 55 | // System.out.println(simulator.time() + ": No load balancing needed."); |
| 56 | } |
| 57 | |
| 58 | } |
| 59 | |
| 60 | } |
| 61 | |
| 62 | private class ProcessingFinishedEvent extends AbstractSimEventDelegator<ISchedulableProcess> { |
| 63 | |
| 64 | public ProcessingFinishedEvent(SchedulerModel model) { |
| 65 | super(model, ProcessingFinishedEvent.class.getName()); |
| 66 | } |
| 67 | |
| 68 | @Override |
| 69 | public void eventRoutine(ISchedulableProcess process) { |
| 70 | ISchedulableProcess last = process; |
| 71 | toNow(); |
| 72 | // NEW |
| 73 | int core = getCoreOfARunningProcess(last); |
| 74 | running_processesPerCore.get(core).remove(last); |
| 75 | // running_processes.remove(last); |
| 76 | // TODO: now, we have to check, if we have to perform load |
| 77 | // balancing. |
| 78 | // And probably re-calculate times? |
| 79 | |
| 80 | // suggestion: |
| 81 | if (running_processesPerCore.get(core).size() == 0) { |
| 82 | int coreToBalanceFrom = getCoreWithLongestQueue(); |
| 83 | if (running_processesPerCore.get(coreToBalanceFrom).size() <= 1) { |
| 84 | // all cores are idle or have no contention |
| 85 | } else { |
| 86 | // Try load balancing one time unit from now |
| 87 | DoLoadBalancingEvent event = new DoLoadBalancingEvent(SimProcessorSharingResourceLinuxO1.this |
| 88 | .getModel()); |
| 89 | double simTime = getModel().getSimulationControl().getCurrentSimulationTime(); |
| 90 | event.schedule(IEntity.NULL, simTime+1); |
| 91 | } |
| 92 | } |
| 93 | // System.out.println(simulator.time() + ": " + last.getId() + " finished"); |
| 94 | // LoggingWrapper.log(last + " finished."); |
| 95 | scheduleNextEvent(); |
| 96 | last.activate(); |
| 97 | } |
| 98 | |
| 99 | } |
| 100 | |
| 101 | private ProcessingFinishedEvent processingFinished = new ProcessingFinishedEvent(null); |
| 102 | private ArrayList<Hashtable<ISchedulableProcess,Double>> running_processesPerCore = new ArrayList<Hashtable<ISchedulableProcess, Double>>(); |
| 103 | // private Hashtable<ISchedulableProcess,Double> running_processes = new |
| 104 | // Hashtable<ISchedulableProcess, Double>(); |
| 105 | private double last_time; |
| 106 | private int coreToUseForInitialLoadBalancing = 0; |
| 107 | |
| 108 | public SimProcessorSharingResourceLinuxO1(SchedulerModel model, String name, String id, int numberOfCores) { |
| 109 | super(model, numberOfCores, name, id); |
| 110 | for (int j=0; j<numberOfCores; j++) { |
| 111 | running_processesPerCore.add(new Hashtable<ISchedulableProcess, Double>()); |
| 112 | } |
| 113 | } |
| 114 | |
| 115 | |
| 116 | |
| 117 | public void scheduleNextEvent() { |
| 118 | /** |
| 119 | * New: look in all queues, i.e. in all nested running_processes |
| 120 | * hashtables, which process is to be scheduled next. |
| 121 | */ |
| 122 | ISchedulableProcess shortest = null; |
| 123 | Double shortestTime = 0.0; |
| 124 | for (Hashtable<ISchedulableProcess, Double> running_processes : running_processesPerCore) { |
| 125 | for (ISchedulableProcess process : running_processes.keySet()) { |
| 126 | // System.out.println("Time: " + simulator.time() + ", looking for shortest time: " + process.getId() + " time: " + running_processes.get(process) + ", speed: " + getSpeed(process)); |
| 127 | if (shortest == null || shortestTime > running_processes.get(process) * getSpeed(process)){ |
| 128 | shortest = process; |
| 129 | shortestTime = running_processes.get(process) * getSpeed(process); |
| 130 | // System.out.println("Shortest: " + shortest.getId() + ", shortest time: " + shortestTime); |
| 131 | } |
| 132 | } |
| 133 | } |
| 134 | |
| 135 | processingFinished.removeEvent(); |
| 136 | if (shortest!=null){ |
| 137 | // New: calculate time for process |
| 138 | double time = shortestTime;// * getSpeed(shortest); |
| 139 | // double time = running_processes.get(shortest) * getSpeed(); |
| 140 | // System.out.println("Time: " + simulator.time() + ", scheduling event at " + time); |
| 141 | if (!MathTools.less(0, time)) { |
| 142 | time = 0.0; |
| 143 | } |
| 144 | processingFinished.schedule(shortest, time); |
| 145 | } |
| 146 | } |
| 147 | |
| 148 | private int getCoreOfARunningProcess(ISchedulableProcess process) { |
| 149 | for (int i=0; i<running_processesPerCore.size(); i++) { |
| 150 | Hashtable<ISchedulableProcess, Double> running_processes = running_processesPerCore.get(i); |
| 151 | if (running_processes.containsKey(process)) { |
| 152 | return i; |
| 153 | } |
| 154 | } |
| 155 | LoggingWrapper.logger.warn("Core of process not found. Returning core 0."); |
| 156 | return 0; |
| 157 | } |
| 158 | |
| 159 | private int getCoreWithLongestQueue() { |
| 160 | int coreWithLongestQueue = 0; |
| 161 | int queueSize = 0; |
| 162 | for (int i=0; i<running_processesPerCore.size(); i++) { |
| 163 | if (running_processesPerCore.get(i).size() > queueSize) { |
| 164 | queueSize = running_processesPerCore.get(i).size(); |
| 165 | coreWithLongestQueue = i; |
| 166 | } |
| 167 | } |
| 168 | return coreWithLongestQueue; |
| 169 | } |
| 170 | |
| 171 | private int getCoreWithShortestQueue() { |
| 172 | int coreWithShortestQueue = -1; |
| 173 | int queueSize = 0; |
| 174 | for (int i=0; i<running_processesPerCore.size(); i++) { |
| 175 | if (coreWithShortestQueue == -1) { |
| 176 | queueSize = running_processesPerCore.get(i).size(); |
| 177 | coreWithShortestQueue = i; |
| 178 | } else { |
| 179 | if (running_processesPerCore.get(i).size() < queueSize) { |
| 180 | queueSize = running_processesPerCore.get(i).size(); |
| 181 | coreWithShortestQueue = i; |
| 182 | } |
| 183 | } |
| 184 | } |
| 185 | return coreWithShortestQueue; |
| 186 | } |
| 187 | |
| 188 | |
| 189 | private void toNow() { |
| 190 | double now = getModel().getSimulationControl().getCurrentSimulationTime(); |
| 191 | double passed_time = now - last_time; |
| 192 | // System.out.println("toNow: " + now + " - " + last_time + " = " + |
| 193 | // passed_time); |
| 194 | if (MathTools.less(0, passed_time)){ |
| 195 | // passed_time /= getSpeed(); |
| 196 | // NEW |
| 197 | for (Hashtable<ISchedulableProcess, Double> running_processes : running_processesPerCore) { |
| 198 | for (Entry<ISchedulableProcess,Double> e : running_processes.entrySet()) { |
| 199 | double processPassedTime = passed_time / getSpeed(e.getKey()); |
| 200 | double rem = e.getValue() - processPassedTime; |
| 201 | // System.out.println("toNow " + e.getKey().getId() + ": " + |
| 202 | // e.getValue() + " - " + processPassedTime + " = " + rem); |
| 203 | e.setValue(rem); |
| 204 | } |
| 205 | } |
| 206 | } |
| 207 | last_time = now; |
| 208 | } |
| 209 | |
| 210 | @Override |
| 211 | public double getRemainingDemand(ISchedulableProcess process) { |
| 212 | boolean hasDemand = false; |
| 213 | for (Hashtable<ISchedulableProcess, Double> running_processes : running_processesPerCore) { |
| 214 | if (running_processes.containsKey(process)) { |
| 215 | hasDemand = true; |
| 216 | break; |
| 217 | } |
| 218 | } |
| 219 | if (hasDemand == false) { |
| 220 | return 0.0; |
| 221 | } |
| 222 | toNow(); |
| 223 | for (Hashtable<ISchedulableProcess, Double> running_processes : running_processesPerCore) { |
| 224 | if (!running_processes.contains(process)) { |
| 225 | return running_processes.get(process); |
| 226 | } |
| 227 | } |
| 228 | // Should not be reached. |
| 229 | return 0.0; |
| 230 | } |
| 231 | |
| 232 | @Override |
| 233 | public void updateDemand(ISchedulableProcess process, double demand) { |
| 234 | for (Hashtable<ISchedulableProcess, Double> running_processes : running_processesPerCore) { |
| 235 | for (Entry<ISchedulableProcess,Double> e : running_processes.entrySet()) { |
| 236 | if (e.getKey().equals(process)) { |
| 237 | e.setValue(demand); |
| 238 | break; |
| 239 | } |
| 240 | } |
| 241 | } |
| 242 | scheduleNextEvent(); |
| 243 | } |
| 244 | |
| 245 | // New: calculate speed for a process. |
| 246 | private double getSpeed(ISchedulableProcess process) { |
| 247 | int core = getCoreOfARunningProcess(process); |
| 248 | double speed = (double)running_processesPerCore.get(core).size(); |
| 249 | // double speed = (double)running_processes.size() / |
| 250 | // (double)getCapacity(); |
| 251 | |
| 252 | // comparison here is unnecessary, speed cannot be lower than 1. Keep it |
| 253 | // anyway. |
| 254 | return speed < 1.0 ? 1.0 : speed; |
| 255 | } |
| 256 | |
| 257 | |
| 258 | public void start() { |
| 259 | } |
| 260 | |
| 261 | |
| 262 | @Override |
| 263 | protected void dequeue(ISchedulableProcess process) { |
| 264 | } |
| 265 | |
| 266 | |
| 267 | @Override |
| 268 | protected void doProcessing(ISchedulableProcess process, int resourceServiceID, double demand) { |
| 269 | toNow(); |
| 270 | LoggingWrapper.log("PS: " + process + " demands " + demand); |
| 271 | //System.out.println("PS: " + process.getId() + " demands " + demand); |
| 272 | |
| 273 | int coreToPutOn = getLastCoreProcessWasRunningOn(process); |
| 274 | if (coreToPutOn == -1) { |
| 275 | // This is a new process which has issued demand for the first time. |
| 276 | // Same as parent: always use core 0. Check if load balancing has to be done afterwards. |
| 277 | // If there are tasks running on core 0, use a random core. |
| 278 | coreToUseForInitialLoadBalancing = 0; |
| 279 | if (running_processesPerCore.get(coreToUseForInitialLoadBalancing).size() > 0) { |
| 280 | coreToUseForInitialLoadBalancing = new Random().nextInt(getCapacity()); |
| 281 | } |
| 282 | putProcessOnCore(process, demand, coreToUseForInitialLoadBalancing); |
| 283 | // running_processes.put(process, demand); |
| 284 | int coreToBalanceFrom = getCoreWithLongestQueue(); |
| 285 | if (running_processesPerCore.get(coreToBalanceFrom).size() <= 1) { |
| 286 | // all cores are idle or have no contention |
| 287 | } else { |
| 288 | // Try load balancing one time unit from now |
| 289 | DoLoadBalancingEvent event = new DoLoadBalancingEvent(getModel()); |
| 290 | double simTime = getModel().getSimulationControl().getCurrentSimulationTime(); |
| 291 | event.schedule(IEntity.NULL, simTime+1); |
| 292 | } |
| 293 | } else { |
| 294 | putProcessOnCore(process, demand, coreToPutOn); |
| 295 | // running_processesPerCore.get(coreToPutOn).put(process, demand); |
| 296 | } |
| 297 | |
| 298 | // I don't know if this is right here. |
| 299 | // I call toNow() again to update all processes (some processes might |
| 300 | // now share the core with the new process) |
| 301 | toNow(); |
| 302 | scheduleNextEvent(); |
| 303 | process.passivate(); |
| 304 | } |
| 305 | |
| 306 | @Override |
| 307 | protected void enqueue(ISchedulableProcess process) { |
| 308 | } |
| 309 | |
| 310 | |
| 311 | public void stop() { |
| 312 | |
| 313 | } |
| 314 | |
| 315 | public void registerProcess(IRunningProcess runningProcess) { |
| 316 | } |
| 317 | |
| 318 | |
| 319 | public int getQueueLengthFor(SimResourceInstance simResourceInstance) { |
| 320 | // TODO where is this needed? Return hard coded queue length of first |
| 321 | // core. |
| 322 | return this.running_processesPerCore.get(0).size(); |
| 323 | // return this.running_processes.size(); |
| 324 | } |
| 325 | |
| 326 | private Hashtable<ISchedulableProcess,Integer> all_processes = new Hashtable<ISchedulableProcess, Integer>(); |
| 327 | |
| 328 | /** |
| 329 | * return -1 if a process was not running before, i.e. is a new process. |
| 330 | * |
| 331 | * @param process |
| 332 | * @return |
| 333 | */ |
| 334 | private int getLastCoreProcessWasRunningOn(ISchedulableProcess process) { |
| 335 | if (all_processes.containsKey(process)) { |
| 336 | return all_processes.get(process); |
| 337 | } |
| 338 | return -1; |
| 339 | } |
| 340 | |
| 341 | private void putProcessOnCore(ISchedulableProcess process, double demand, int core) { |
| 342 | if (all_processes.containsKey(process)) { |
| 343 | all_processes.remove(process); |
| 344 | } |
| 345 | all_processes.put(process, core); |
| 346 | //System.out.println(simulator.time() + ": Putting " + process.getId() + " with demand " + demand + " on core " + core); |
| 347 | running_processesPerCore.get(core).put(process, demand); |
| 348 | } |
| 349 | |
| 350 | } |