Skip to content

Commit ba581f0

Browse files
Merge #76 - Simplify/Fix basketentryoffset processing
A significant amount of on-the-wire space defining partitions is the basketentryoffsets array. Previous commits attempted to lower the ser/de overhead by deduplicating/interning common basketentryoffsets (since, the assumption is that many baskets will have the same offsets if they're stored in the same ROOT cluster).
2 parents 8ebb7bc + 27ce822 commit ba581f0

File tree

1 file changed

+61
-33
lines changed

1 file changed

+61
-33
lines changed

src/main/java/edu/vanderbilt/accre/laurelin/spark_ttree/SlimTBranch.java

Lines changed: 61 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package edu.vanderbilt.accre.laurelin.spark_ttree;
22

3+
import static com.google.common.base.Preconditions.checkNotNull;
34
import static edu.vanderbilt.accre.laurelin.root_proxy.TBranch.entryOffsetToRangeMap;
45

56
import java.io.IOException;
@@ -8,17 +9,21 @@
89
import java.io.ObjectStreamException;
910
import java.io.Serializable;
1011
import java.nio.ByteBuffer;
11-
import java.util.Arrays;
1212
import java.util.HashMap;
1313
import java.util.Map;
1414
import java.util.Map.Entry;
1515

1616
import org.apache.logging.log4j.LogManager;
1717
import org.apache.logging.log4j.Logger;
1818

19+
import com.google.common.cache.CacheBuilder;
20+
import com.google.common.cache.CacheLoader;
21+
import com.google.common.cache.LoadingCache;
1922
import com.google.common.collect.ImmutableMap;
2023
import com.google.common.collect.ImmutableRangeMap;
2124
import com.google.common.collect.ImmutableRangeMap.Builder;
25+
import com.google.common.collect.Interner;
26+
import com.google.common.collect.Interners;
2227
import com.google.common.collect.Range;
2328

2429
import edu.vanderbilt.accre.laurelin.Cache;
@@ -36,6 +41,7 @@
3641
* and byte offsets to each basket
3742
*/
3843
public class SlimTBranch implements Serializable, SlimTBranchInterface, ObjectInputValidation {
44+
private static final Logger logger = LogManager.getLogger();
3945
private static final long serialVersionUID = 1L;
4046
private String path;
4147

@@ -51,19 +57,7 @@ public class SlimTBranch implements Serializable, SlimTBranchInterface, ObjectIn
5157
private int basketStart;
5258
private int basketEnd;
5359

54-
/**
55-
* Deduplicate range->basket maps, since many (all?) of them will be same
56-
* for different branches in a file
57-
*/
58-
private static HashMap<ImmutableRangeMap<Long, Integer>,
59-
ImmutableRangeMap<Long, Integer>> dedupRangeMap =
60-
new HashMap<ImmutableRangeMap<Long, Integer>,
61-
ImmutableRangeMap<Long, Integer>>();
62-
63-
public synchronized ImmutableRangeMap<Long, Integer> dedupAndReturnRangeMap(ImmutableRangeMap<Long, Integer> val) {
64-
dedupRangeMap.putIfAbsent(val, val);
65-
return dedupRangeMap.get(val);
66-
}
60+
private Interner<ImmutableRangeMap<Long, Integer>> rangeMapInterner = Interners.newWeakInterner();
6761

6862
/**
6963
* Copy the given slim branch and trim it by removing unneccessary basket
@@ -88,6 +82,7 @@ public SlimTBranch copyAndTrim(long eventStart, long eventEnd) {
8882
}
8983

9084
public void checkInvariants() {
85+
checkNotNull(rangeToBasketIDMap);
9186
if (basketEnd == 0) {
9287
assert basketEnd != 0;
9388
}
@@ -130,7 +125,9 @@ public SlimTBranch(String path, Range<Long>[] basketRangeList, TBranch.ArrayDesc
130125
int targetBasket = i + basketStart;
131126
basketBuilder = basketBuilder.put(basketRangeList[i], targetBasket);
132127
}
133-
rangeToBasketIDMap = dedupAndReturnRangeMap(basketBuilder.build());
128+
ImmutableRangeMap<Long, Integer> tmp = basketBuilder.build();
129+
checkNotNull(tmp);
130+
rangeToBasketIDMap = rangeMapInterner.intern(tmp);
134131
checkInvariants();
135132
}
136133

@@ -145,6 +142,7 @@ public static SlimTBranch getFromTBranch(TBranch fatBranch) {
145142

146143
@Override
147144
public ImmutableRangeMap<Long, Integer> getRangeToBasketIDMap() {
145+
checkNotNull(rangeToBasketIDMap);
148146
return rangeToBasketIDMap;
149147
}
150148

@@ -430,18 +428,22 @@ public Range<Long>[] getRangeToBasketID() {
430428
return rangeToBasketID;
431429
}
432430

433-
private static class ArrayKeyWrapper<T> {
434-
public T[] val;
431+
private static class TrimBasketKey {
432+
private ImmutableRangeMap<Long, Integer> range;
433+
private int start;
434+
private int end;
435435

436-
public ArrayKeyWrapper(T[] val) {
437-
this.val = val;
436+
public TrimBasketKey(ImmutableRangeMap<Long, Integer> range, int start, int end) {
437+
this.range = range;
438+
this.start = start;
439+
this.end = end;
438440
}
439441

440442
@Override
441443
public int hashCode() {
442444
final int prime = 31;
443-
int result = 1;
444-
result = prime * result + Arrays.deepHashCode(val);
445+
int result = start + end;
446+
result = (prime * result) ^ range.hashCode();
445447
return result;
446448
}
447449

@@ -453,22 +455,45 @@ public boolean equals(Object obj) {
453455
if (obj == null) {
454456
return false;
455457
}
456-
if (!(obj instanceof ArrayKeyWrapper)) {
458+
if (!(obj instanceof TrimBasketKey)) {
457459
return false;
458460
}
459-
ArrayKeyWrapper<T> other = (ArrayKeyWrapper<T>) obj;
460-
return Arrays.deepEquals(val, other.val);
461+
TrimBasketKey other = (TrimBasketKey) obj;
462+
return ((start == other.start) &&
463+
(end == other.end) &&
464+
(range.equals(other.range)));
461465
}
462466
}
463467

464-
private static transient HashMap<ArrayKeyWrapper<Range<Long>>, Range<Long>[]> rangeDedupMap = new
465-
HashMap<ArrayKeyWrapper<Range<Long>>, Range<Long>[]>();
466-
467-
private synchronized Range<Long>[] dedupRange(Range<Long>[] val) {
468-
ArrayKeyWrapper<Range<Long>> key = new ArrayKeyWrapper<Range<Long>>(val);
469-
rangeDedupMap.putIfAbsent(key, val);
470-
return rangeDedupMap.get(key);
471-
}
468+
/**
469+
* Deduplicate range->basket maps, since many (all?) of them will be same
470+
* for different branches in a file. Guessing 2000 as a good cache size
471+
* since that's the upper-bound on the number of branches I'd expect to
472+
* see in a file.
473+
*/
474+
private static LoadingCache<TrimBasketKey,
475+
Range<Long>[]> dedupRangeMap =
476+
CacheBuilder.newBuilder()
477+
.maximumSize(2000)
478+
.softValues()
479+
.build(
480+
new CacheLoader<TrimBasketKey,
481+
Range<Long>[]>() {
482+
@Override
483+
public Range<Long>[] load(TrimBasketKey key) {
484+
ImmutableMap<Range<Long>, Integer> map = key.range.asMapOfRanges();
485+
Range<Long>[] rangeToBasketID = new Range[key.end - key.start];
486+
for (Entry<Range<Long>, Integer> e: map.entrySet()) {
487+
int idx = e.getValue();
488+
if ((idx < key.start) || (idx >= key.end)) {
489+
continue;
490+
}
491+
Range<Long> val = e.getKey();
492+
rangeToBasketID[idx - key.start] = val;
493+
}
494+
return rangeToBasketID;
495+
}
496+
});
472497

473498
protected SerializeStorage(SlimTBranch in) {
474499
in.checkInvariants();
@@ -499,7 +524,9 @@ protected SerializeStorage(SlimTBranch in) {
499524
Range<Long> val = e.getKey();
500525
rangeToBasketID[idx - basketStart] = val;
501526
}
502-
rangeToBasketID = dedupRange(rangeToBasketID);
527+
TrimBasketKey cacheKey = new TrimBasketKey(in.getRangeToBasketIDMap(), basketStart, basketEnd);
528+
rangeToBasketID = dedupRangeMap.getUnchecked(cacheKey);
529+
checkNotNull(rangeToBasketID);
503530
}
504531

505532
/**
@@ -508,6 +535,7 @@ protected SerializeStorage(SlimTBranch in) {
508535
* @throws ObjectStreamException We don't throw, but required by Java in signature
509536
*/
510537
private Object readResolve() throws ObjectStreamException {
538+
checkNotNull(rangeToBasketID);
511539
SlimTBranch ret = new SlimTBranch(path, rangeToBasketID, arrayDesc, basketStart);
512540
int idx = basketStart;
513541
for (long off: basketByteOffsets) {

0 commit comments

Comments
 (0)