1 | package de.uka.ipd.sdq.scheduler.queueing.strategies; |
2 | |
3 | import java.util.ArrayList; |
4 | import java.util.Collection; |
5 | import java.util.Hashtable; |
6 | import java.util.List; |
7 | |
8 | import de.uka.ipd.sdq.scheduler.LoggingWrapper; |
9 | import de.uka.ipd.sdq.scheduler.loaddistribution.IInstanceSelector; |
10 | import de.uka.ipd.sdq.scheduler.loaddistribution.ILoadBalancer; |
11 | import de.uka.ipd.sdq.scheduler.processes.IActiveProcess; |
12 | import de.uka.ipd.sdq.scheduler.queueing.IQueueingStrategy; |
13 | import de.uka.ipd.sdq.scheduler.queueing.IRunQueue; |
14 | import de.uka.ipd.sdq.scheduler.resources.IResourceInstance; |
15 | import de.uka.ipd.sdq.scheduler.resources.active.SimResourceInstance; |
16 | import de.uka.ipd.sdq.scheduler.strategy.impl.AbstractScheduler; |
17 | |
18 | public class MultipleQueuesStrategy implements IQueueingStrategy { |
19 | |
20 | private ILoadBalancer loadBalancer; |
21 | private IInstanceSelector instanceSelector; |
22 | private Hashtable<IResourceInstance, IRunQueue> runQueueTable; |
23 | private boolean in_front_when_balancing; |
24 | |
25 | public MultipleQueuesStrategy(Collection<IResourceInstance> allInstances, |
26 | IRunQueue prototypeRunQueue, |
27 | IInstanceSelector initialInstanceSelector, |
28 | ILoadBalancer loadBalancer, |
29 | boolean in_front_when_balancing) { |
30 | runQueueTable = new Hashtable<IResourceInstance, IRunQueue>(); |
31 | this.instanceSelector = initialInstanceSelector; |
32 | this.loadBalancer = loadBalancer; |
33 | this.in_front_when_balancing = in_front_when_balancing; |
34 | for (IResourceInstance resourceInstance : allInstances) { |
35 | runQueueTable.put(resourceInstance, prototypeRunQueue |
36 | .createNewInstance()); |
37 | } |
38 | } |
39 | |
40 | /** |
41 | * Returns the RunQueue for a resource instance. |
42 | */ |
43 | public IRunQueue getRunQueueFor(IResourceInstance instance) { |
44 | return runQueueTable.get(instance); |
45 | } |
46 | |
47 | /** |
48 | * Returns the next runnable process for the resource instance. |
49 | */ |
50 | |
51 | public IActiveProcess getNextProcessFor(IResourceInstance instance) { |
52 | return getRunQueueFor(instance).getNextRunnableProcess(); |
53 | } |
54 | |
55 | /** |
56 | * Adds a process to the runqueue. The process is added using the strategy |
57 | * of the runqueue. |
58 | * |
59 | * A process is added after its creation or after waiting. |
60 | */ |
61 | |
62 | public void addProcess(IActiveProcess process, IResourceInstance current, boolean inFront) { |
63 | registerProcess(process, current); |
64 | getRunQueueFor(process.getLastInstance()).addProcess(process, inFront); |
65 | } |
66 | |
67 | /** |
68 | * Moves the given process from the runqueue of the src instance to the |
69 | * runqueue of the dest instance. |
70 | * |
71 | * @param process |
72 | * Process to be moved. |
73 | * @param src |
74 | * Source resource instance. |
75 | * @param dest |
76 | * Destination resource instance. |
77 | */ |
78 | public void move(IActiveProcess process, IResourceInstance src, |
79 | IResourceInstance dest) { |
80 | assert process.getLastInstance().equals(src); |
81 | assert getRunQueueFor(src).contains(process) : "Process '" + process |
82 | + "' is not in the runqueue of '" + src + "'"; |
83 | assert process.getRunQueue() == getRunQueueFor(src) : "Invalid state of runqueues!"; |
84 | |
85 | LoggingWrapper.log("Moving " + process + " from " + src + " to " + dest); |
86 | |
87 | double waiting = getRunQueueFor(src).getWaitingTime(process); |
88 | getRunQueueFor(src).removeProcess(process); |
89 | getRunQueueFor(dest).addProcess(process, in_front_when_balancing); |
90 | getRunQueueFor(dest).setWaitingTime(process, waiting); |
91 | process.wasMovedTo(dest); |
92 | } |
93 | |
94 | |
95 | public void activelyBalance(IResourceInstance instance) { |
96 | loadBalancer.activelyBalance(instance); |
97 | } |
98 | |
99 | public Collection<IResourceInstance> getResourceInstances() { |
100 | return this.runQueueTable.keySet(); |
101 | } |
102 | |
103 | public boolean isIdle(IResourceInstance instance) { |
104 | return getRunQueueFor(instance).isEmpty(); |
105 | } |
106 | |
107 | /** |
108 | * Returns all queues without jobs. |
109 | * |
110 | * @param runQueueCollection |
111 | * @return |
112 | */ |
113 | public List<IResourceInstance> getIdleInstances() { |
114 | List<IResourceInstance> idleInstances = new ArrayList<IResourceInstance>(); |
115 | for (IResourceInstance instance : getResourceInstances()) { |
116 | if (isIdle(instance)) |
117 | idleInstances.add(instance); |
118 | } |
119 | return idleInstances; |
120 | } |
121 | |
122 | |
123 | public boolean removePendingProcess(IActiveProcess process) { |
124 | return getRunQueueFor(process.getLastInstance()).removePendingProcess( |
125 | process); |
126 | } |
127 | |
128 | |
129 | public boolean containsPending(IActiveProcess process) { |
130 | return getRunQueueFor(process.getLastInstance()).containsPending( |
131 | process); |
132 | } |
133 | |
134 | |
135 | public void removeRunning(IActiveProcess process) { |
136 | getRunQueueFor(process.getLastInstance()).removeRunning(process); |
137 | } |
138 | |
139 | |
140 | public IResourceInstance runningOn(IActiveProcess process) { |
141 | for (IResourceInstance instance : runQueueTable.keySet()) { |
142 | if (runQueueTable.get(instance).containsRunning(process)) { |
143 | return instance; |
144 | } |
145 | } |
146 | return null; |
147 | } |
148 | |
149 | |
150 | public void setRunningOn(IActiveProcess process, IResourceInstance instance) { |
151 | getRunQueueFor(instance).setRunningOn(process, instance); |
152 | } |
153 | |
154 | public void forkProcess(IActiveProcess process, IResourceInstance current, |
155 | boolean inFront) { |
156 | addProcess(process, current, inFront); |
157 | loadBalancer.onFork(current); |
158 | } |
159 | |
160 | public void registerProcess(IActiveProcess process, IResourceInstance current) { |
161 | IResourceInstance instance = process.getLastInstance(); |
162 | if (instance == null) { |
163 | instance = instanceSelector.selectInstanceFor(process,current); |
164 | process.setLastInstance(instance); |
165 | process.setIdealInstance(instance); |
166 | } |
167 | } |
168 | |
169 | public void fromRunningToWaiting(IActiveProcess process) { |
170 | removeRunning(process); |
171 | } |
172 | |
173 | public void onSleep(IResourceInstance lastInstance) { |
174 | loadBalancer.onSleep(lastInstance); |
175 | } |
176 | |
177 | public void terminateProcess(IActiveProcess process) { |
178 | removePendingProcess(process); |
179 | loadBalancer.onTerminate(process.getLastInstance()); |
180 | } |
181 | |
182 | public void fromWaitingToReady(IActiveProcess process, |
183 | IResourceInstance current, boolean in_front_after_waiting) { |
184 | addProcess(process, current, in_front_after_waiting); |
185 | |
186 | loadBalancer.onWake(current); |
187 | } |
188 | |
189 | public List<IActiveProcess> getStarvingProcesses( |
190 | IResourceInstance instance, double starvationLimit){ |
191 | IRunQueue runQ = getRunQueueFor(instance); |
192 | return runQ.getStarvingProcesses(starvationLimit); |
193 | } |
194 | |
195 | public void resetStarvationInfo() { |
196 | for(IRunQueue q : this.runQueueTable.values()){ |
197 | q.resetStarvationInfo(); |
198 | } |
199 | } |
200 | |
201 | public int getQueueLengthFor(SimResourceInstance simResourceInstance) { |
202 | return getRunQueueFor(simResourceInstance).getCurrentLoad(); |
203 | } |
204 | |
205 | |
206 | } |