@@ -58,8 +58,6 @@ class JoinOp {
58
58
59
59
private Set uniqueKeys = new LinkedHashSet ()
60
60
61
- private Map<Object ,Object > originalKeyMap = new HashMap ()
62
-
63
61
JoinOp ( DataflowReadChannel source , DataflowReadChannel target , Map params = null ) {
64
62
CheckHelper . checkParams(' join' , params, JOIN_PARAMS )
65
63
this . source = source
@@ -175,24 +173,6 @@ class JoinOp {
175
173
// get the index key for this object
176
174
final item0 = DataflowHelper . makeKey(pivot, data)
177
175
178
- // Store the mapping from normalized key to original key
179
- // Prefer GroupKey over plain keys
180
- def existingOriginal = originalKeyMap. get(item0. keys)
181
- if (existingOriginal == null ) {
182
- originalKeyMap[item0. keys] = item0. originalKeys
183
- } else {
184
- // Check if any of the new original keys is a GroupKey
185
- // If so, replace the existing mapping
186
- for (int i = 0 ; i < item0. originalKeys. size(); i++ ) {
187
- def newKey = item0. originalKeys[i]
188
- def oldKey = existingOriginal instanceof List ? existingOriginal[i] : existingOriginal
189
- if (newKey instanceof GroupKey && ! (oldKey instanceof GroupKey )) {
190
- originalKeyMap[item0. keys] = item0. originalKeys
191
- break
192
- }
193
- }
194
- }
195
-
196
176
// check for unique keys
197
177
checkForDuplicate(item0. keys, item0. values, index, false )
198
178
@@ -210,7 +190,6 @@ class JoinOp {
210
190
def entries = channels[index]
211
191
212
192
// add the received item to the list
213
- // Store the full KeyPair to preserve original keys
214
193
entries << item0
215
194
setSingleton(index, item0. values. size()== 0 )
216
195
@@ -265,16 +244,6 @@ class JoinOp {
265
244
return result
266
245
}
267
246
268
- // Helper method to retrieve original data from buffer
269
- private def getOriginalDataFromBuffer (Map<Object ,Map<Integer ,List > > buffer , Object key , int channelIndex ) {
270
- def channels = buffer. get(key)
271
- if (channels == null ) return null
272
- def items = channels. get(channelIndex)
273
- if (items == null || items. isEmpty()) return null
274
- // Need to reconstruct the original data from the values and the key
275
- // This is a simplified version - in reality we'd need to track the full original items
276
- return null // For now, we'll use a different approach
277
- }
278
247
279
248
private final void checkRemainder (Map<Object ,Map<Integer ,List > > buffers , int count , DataflowWriteChannel target ) {
280
249
log. trace " Operator `join` remainder buffer: ${ -> buffers} "
@@ -310,7 +279,7 @@ class JoinOp {
310
279
}
311
280
312
281
// Use the best available original key, or fall back to the map key
313
- def originalKey = bestOriginalKey ?: originalKeyMap . get(key) ?: key
282
+ def originalKey = bestOriginalKey ?: key
314
283
addToList(result, originalKey)
315
284
316
285
for ( int i= 0 ; i< count; i++ ) {
0 commit comments