@@ -3,266 +3,86 @@ package org.opalj
3
3
package fpcf
4
4
package scheduling
5
5
6
- import org .opalj .collection .IntIterator
7
- import org .opalj .graphs .sccs
8
- import org .opalj .graphs .topologicalSort
9
-
10
6
/**
11
7
* Independent Phase Merge Scheduling (IPMS) Strategy.
12
8
* Merges independent batches to optimize parallelism.
13
9
*/
14
- class IndependentPhaseMergeScheduling [A ](ps : PropertyStore , scheduleLazyTransformerInAllBatches : Boolean )
15
- extends SchedulingStrategy [A ] {
16
-
17
- val name = " IPMS"
18
-
19
- override def schedule (
20
- ps : PropertyStore ,
21
- allCS : Set [ComputationSpecification [A ]]
22
- ): List [PhaseConfiguration [A ]] = {
23
-
24
- // Creates a map from ComputationSpecifications to their indices
25
- val computationSpecificationMap : Map [ComputationSpecification [A ], Int ] = allCS.zipWithIndex.toMap
26
- val computationSpecificationArray : Array [ComputationSpecification [A ]] = allCS.toArray
27
-
28
- // Initializes an empty schedule graph
29
- var scheduleGraph : Map [Int , Set [Int ]] = Map .empty
30
-
31
- // Helper functions and logic for IPMS scheduling follow...
32
- def getAllCSFromPropertyBounds (
33
- properties : Set [PropertyBounds ]
34
- ): Set [ComputationSpecification [A ]] = {
35
- def containsProperty (cs : ComputationSpecification [A ], property : PropertyBounds ): Boolean =
36
- cs.derivesLazily.contains(property) ||
37
- cs.derivesCollaboratively.contains(property) ||
38
- cs.derivesEagerly.contains(property)
39
-
40
- allCS.filter(cs => properties.exists(containsProperty(cs, _)))
41
- }
42
-
43
- def mapCSToNum (specifications : Set [ComputationSpecification [A ]]): Set [Int ] = {
44
- specifications.flatMap(computationSpecificationMap.get)
45
- }
46
-
47
- def edgeFunctionForSCCS (node : Int ): IntIterator = {
48
- val edges = scheduleGraph.getOrElse(node, Set .empty).iterator
49
- new IntIterator {
50
- def hasNext : Boolean = edges.hasNext
51
- def next (): Int = edges.next()
10
+ abstract class IndependentPhaseMergeScheduling extends MaximumPhaseScheduling {
11
+
12
+ override def mergeIndependentBatches (
13
+ batchCount : Int ,
14
+ dependencyGraph : Map [Int , List [Int ]],
15
+ nodeIndexMap : Map [Int , List [Int ]]
16
+ ): (Map [Int , List [Int ]], Map [Int , List [Int ]]) = {
17
+ var transformingMap = nodeIndexMap
18
+ var counter = batchCount
19
+
20
+ def mergeIndependentBatches_rek (
21
+ graph : Map [Int , List [Int ]]
22
+ ): Map [Int , List [Int ]] = {
23
+
24
+ def getUses (batch : Int , graph : Map [Int , List [Int ]]): Set [Int ] = {
25
+ val directUses = graph.getOrElse(batch, List .empty).toSet
26
+ val recursiveUses = directUses.flatMap(otherBatch => getUses(otherBatch, graph))
27
+ directUses ++ recursiveUses
52
28
}
53
- }
54
-
55
- def getAllUses (css : List [Int ]): Set [PropertyBounds ] = {
56
- var allUses : Set [PropertyBounds ] = Set .empty
57
- css.foreach { cs => allUses = allUses ++ computationSpecificationArray(cs).uses(ps) }
58
- allUses
59
- }
60
-
61
- def setLazyInAllBatches (
62
- tmp_aCyclicGraph : Map [List [Int ], Set [Int ]],
63
- firstElement : List [Int ]
64
- ): Map [List [Int ], Set [Int ]] = {
65
- var visited_batches : List [List [Int ]] = List .empty
66
- var aCyclicGraph = tmp_aCyclicGraph
67
-
68
- def setLazyInAllBatches_rek (
69
- tmp_aCyclicGraphInternal : Map [List [Int ], Set [Int ]],
70
- firstElement : List [Int ]
71
- ): Map [List [Int ], Set [Int ]] = {
72
29
73
- if (firstElement.forall(csID =>
74
- computationSpecificationArray(csID).computationType.equals(LazyComputation ) ||
75
- computationSpecificationArray(csID).computationType.equals(Transformer )
76
- )
77
- ) {
78
- var existInSomeBatch = false
79
- tmp_aCyclicGraphInternal.foreach { batch =>
80
- if (batch._2.toList.intersect(firstElement).nonEmpty && batch._1 != firstElement) {
81
- aCyclicGraph = aCyclicGraph + ((batch._1 ++ firstElement) -> mapCSToNum(
82
- getAllCSFromPropertyBounds(getAllUses(batch._1 ++ firstElement))
83
- ).diff((batch._1 ++ firstElement).toSet))
84
- aCyclicGraph = aCyclicGraph - batch._1
85
- existInSomeBatch = true
86
- }
87
- }
88
- if (existInSomeBatch) {
89
- aCyclicGraph = aCyclicGraph - firstElement
90
- setLazyInAllBatches_rek(aCyclicGraph, aCyclicGraph.head._1)
91
- } else {
92
- visited_batches = visited_batches :+ firstElement
93
- val keyList = aCyclicGraph.keys.toSet -- visited_batches
94
- if (keyList.nonEmpty) {
95
- aCyclicGraph = setLazyInAllBatches_rek(aCyclicGraph, keyList.head)
96
- }
97
- }
98
- } else {
99
- visited_batches = visited_batches :+ firstElement
100
- val keyList = aCyclicGraph.keys.toSet -- visited_batches
101
- if (keyList.nonEmpty) {
102
- setLazyInAllBatches_rek(aCyclicGraph, keyList.head)
103
- }
104
- }
105
- aCyclicGraph
30
+ var map : Map [Int , Set [Int ]] = Map .empty
31
+ graph.foreach { batch =>
32
+ val tempUses = getUses(batch._1, graph)
33
+ map = map + (batch._1 -> tempUses)
106
34
}
107
- setLazyInAllBatches_rek(aCyclicGraph, firstElement)
108
- }
109
-
110
- def mergeIndependentBatches (
111
- nodeIndexMap : Map [Int , List [Int ]],
112
- batchCount : Int ,
113
- dependencyGraph : Map [Int , List [Int ]]
114
- ): (Map [Int , List [Int ]], Map [Int , List [Int ]]) = {
115
- var transformingMap = nodeIndexMap
116
- var counter = batchCount
117
-
118
- def mergeIndependentBatches_rek (
119
- graph : Map [Int , List [Int ]]
120
- ): Map [Int , List [Int ]] = {
121
-
122
- var allUses : Set [Int ] = Set .empty
123
-
124
- def getUses (batch : Int ): Set [Int ] = {
125
- val uses = graph.get(batch).head
126
- allUses = allUses ++ uses
127
-
128
- uses.foreach { otherBatch => getUses(otherBatch) }
129
-
130
- val returnUses = allUses
131
- returnUses
132
- }
133
-
134
- var map : Map [Int , Set [Int ]] = Map .empty
135
- graph.foreach { batch =>
136
- val tempUses = getUses(batch._1)
137
- map = map + (batch._1 -> tempUses)
138
- allUses = Set .empty
139
- }
140
-
141
- var couldBeMerged : List [(Int , Int )] = List .empty
142
- map.foreach { batch =>
143
- map.foreach { subBatch =>
144
- if (subBatch != batch) {
145
- if ((! subBatch._2.contains(batch._1)) && (! batch._2.contains(subBatch._1))) {
146
- if (! couldBeMerged.contains((subBatch._1, batch._1))) {
147
- couldBeMerged = couldBeMerged :+ (batch._1, subBatch._1)
148
- }
149
35
36
+ var couldBeMerged : List [(Int , Int )] = List .empty
37
+ map.foreach { batch =>
38
+ map.foreach { subBatch =>
39
+ if (subBatch != batch) {
40
+ if ((! subBatch._2.contains(batch._1)) && (! batch._2.contains(subBatch._1))) {
41
+ if (! couldBeMerged.contains((subBatch._1, batch._1))) {
42
+ couldBeMerged = couldBeMerged :+ (batch._1, subBatch._1)
150
43
}
151
- }
152
-
153
- }
154
-
155
- }
156
-
157
- var updatedGraph : Map [Int , List [Int ]] = graph
158
- if (couldBeMerged.nonEmpty) {
159
- val tempTransformation_2 =
160
- (transformingMap.get(couldBeMerged.head._1).head ++
161
- transformingMap.get(couldBeMerged.head._2).head).distinct
162
- transformingMap =
163
- transformingMap - couldBeMerged.head._1 - couldBeMerged.head._2
164
- transformingMap = transformingMap + (counter -> tempTransformation_2)
165
44
166
- val tempGraph_2 : List [Int ] = (graph.get(couldBeMerged.head._1).head ++
167
- graph.get(couldBeMerged.head._2).head).distinct
168
- updatedGraph = updatedGraph - couldBeMerged.head._1 - couldBeMerged.head._2
169
- updatedGraph = updatedGraph + (counter -> tempGraph_2)
170
-
171
- def replaceIdInMap (oldId : Int , newId : Int ): Unit = {
172
- updatedGraph = updatedGraph.map { case (key, values) =>
173
- key -> values.map(v => if (v == oldId) newId else v)
174
45
}
175
46
}
176
47
177
- replaceIdInMap(couldBeMerged.head._1, counter)
178
- replaceIdInMap(couldBeMerged.head._2, counter)
179
- counter = counter + 1
180
- updatedGraph = mergeIndependentBatches_rek(updatedGraph)
181
48
}
182
- updatedGraph
183
- }
184
- (mergeIndependentBatches_rek(dependencyGraph), transformingMap)
185
- }
186
49
187
- computationSpecificationMap.foreach { csID =>
188
- scheduleGraph += (csID._2 -> mapCSToNum(getAllCSFromPropertyBounds(csID._1.uses(ps))))
189
- }
190
-
191
- if (! scheduleLazyTransformerInAllBatches) {
192
- scheduleGraph.foreach { node =>
193
- if (computationSpecificationArray(node._1).computationType.equals(
194
- LazyComputation
195
- ) || computationSpecificationArray(node._1).computationType.equals(Transformer )
196
- ) {
197
- scheduleGraph.foreach { subNode =>
198
- if (subNode._2.contains(node._1)) {
199
- scheduleGraph =
200
- scheduleGraph +
201
- (node._1 -> (scheduleGraph.get(node._1).head ++ Set (subNode._1)))
202
- }
203
- }
204
- }
205
50
}
206
- }
207
51
208
- var aCyclicGraph = sccs(scheduleGraph.size, edgeFunctionForSCCS)
209
- .map(batch => batch -> mapCSToNum(getAllCSFromPropertyBounds(getAllUses(batch))))
210
- .toMap
211
-
212
- if (scheduleLazyTransformerInAllBatches) {
213
- aCyclicGraph = setLazyInAllBatches(aCyclicGraph, aCyclicGraph.head._1)
214
- }
52
+ var updatedGraph : Map [Int , List [Int ]] = graph
53
+ if (couldBeMerged.nonEmpty) {
54
+ val toBeMerged = nextToMerge(couldBeMerged, transformingMap)
215
55
216
- val graphWithoutSelfDependencies = aCyclicGraph.map { case (nodes, deps) =>
217
- nodes -> (deps -- nodes).toList
218
- }
56
+ val tempTransformation_2 = (transformingMap.get(toBeMerged._1).head ++
57
+ transformingMap.get(toBeMerged._2).head).distinct
58
+ transformingMap =
59
+ transformingMap - toBeMerged._1 - toBeMerged._2
60
+ transformingMap = transformingMap + (counter -> tempTransformation_2)
219
61
220
- var nodeIndexMap : Map [Int , List [Int ]] = Map .empty
221
- var counter = 0
222
- graphWithoutSelfDependencies.foreach { node =>
223
- nodeIndexMap = nodeIndexMap + (counter -> node._1)
224
- counter = counter + 1
225
- }
62
+ val tempGraph_2 : List [Int ] = (graph.get(toBeMerged._1).head ++
63
+ graph.get(toBeMerged._2).head).distinct
64
+ updatedGraph = updatedGraph - toBeMerged._1 - toBeMerged._2
65
+ updatedGraph = updatedGraph + (counter -> tempGraph_2)
226
66
227
- var transformedGraph = graphWithoutSelfDependencies.map { case (node, deps) =>
228
- var dependencies : List [Int ] = List .empty
229
- nodeIndexMap.foreach { tuple =>
230
- if (tuple._2.intersect(deps).nonEmpty) {
231
- dependencies = dependencies :+ tuple._1
67
+ def replaceIdInMap (oldId : Int , newId : Int ): Unit = {
68
+ updatedGraph = updatedGraph.map { case (key, values) =>
69
+ key -> values.map(v => if (v == oldId) newId else v)
70
+ }
232
71
}
233
- }
234
- nodeIndexMap.find(_._2 == node).map(_._1).head -> dependencies
235
- }
236
-
237
- val (newGraph, newTransformingMap) =
238
- mergeIndependentBatches(nodeIndexMap, counter, transformedGraph)
239
- transformedGraph = newGraph
240
- nodeIndexMap = newTransformingMap
241
-
242
- val batchOrder = topologicalSort(transformedGraph)
243
-
244
- var scheduleBatches : List [PhaseConfiguration [A ]] = List .empty
245
72
246
- var alreadyScheduledCS : Set [ComputationSpecification [A ]] = Set .empty
247
- batchOrder.foreach { batch =>
248
- var scheduledInThisPhase : Set [ComputationSpecification [A ]] = Set .empty
249
- nodeIndexMap.get(batch).head.foreach { csID =>
250
- scheduledInThisPhase =
251
- scheduledInThisPhase + computationSpecificationArray(csID)
73
+ replaceIdInMap(toBeMerged._1, counter)
74
+ replaceIdInMap(toBeMerged._2, counter)
75
+ counter = counter + 1
76
+ updatedGraph = mergeIndependentBatches_rek(updatedGraph)
252
77
}
253
-
254
- scheduleBatches = scheduleBatches :+ computePhase(
255
- ps,
256
- scheduledInThisPhase,
257
- allCS -- scheduledInThisPhase -- alreadyScheduledCS
258
- )
259
- alreadyScheduledCS = alreadyScheduledCS ++ scheduledInThisPhase
78
+ updatedGraph
260
79
}
80
+ (mergeIndependentBatches_rek(dependencyGraph), transformingMap)
81
+ }
261
82
262
- scheduleBatches
83
+ def nextToMerge (couldBeMerged : List [(Int , Int )], transformingMap : Map [Int , List [Int ]]): (Int , Int ) = {
84
+ couldBeMerged.head
263
85
}
264
86
}
265
87
266
- object IndependentPhaseMergeScheduling {
267
- val name = " IPMS"
268
- }
88
+ object IndependentPhaseMergeScheduling extends IndependentPhaseMergeScheduling
0 commit comments