Skip to content

cache preset dict for LZ4WithPresetDictDecompressor #14397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,8 @@ Optimizations

* GITHUB#14304: Add SIMD optimizations for scalar quantized queries and indexing. (Simon Cooper)

* GITHUB#14397: Cache preset dict for LZ4WithPresetDictDecompressor. (kkewwei)

Bug Fixes
---------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ public abstract void decompress(

@Override
public abstract Decompressor clone();

public void reset() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private static final class LZ4WithPresetDictDecompressor extends Decompressor {

private int[] compressedLengths;
private byte[] buffer;
private boolean reused = false;

LZ4WithPresetDictDecompressor() {
compressedLengths = new int[0];
Expand All @@ -72,16 +73,15 @@ private static final class LZ4WithPresetDictDecompressor extends Decompressor {

private int readCompressedLengths(
DataInput in, int originalLength, int dictLength, int blockLength) throws IOException {
in.readVInt(); // compressed length of the dictionary, unused
int totalLength = dictLength;
compressedLengths = ArrayUtil.growNoCopy(compressedLengths, originalLength / blockLength + 2);
int i = 0;
compressedLengths = ArrayUtil.growNoCopy(compressedLengths, originalLength / blockLength + 1);
compressedLengths[i++] = in.readVInt(); // compressed length of the dictionary
int totalLength = dictLength;
while (totalLength < originalLength) {

compressedLengths[i++] = in.readVInt();
totalLength += blockLength;
}
return i;
return i - 1;
}

@Override
Expand All @@ -98,12 +98,17 @@ public void decompress(DataInput in, int originalLength, int offset, int length,
final int blockLength = in.readVInt();

final int numBlocks = readCompressedLengths(in, originalLength, dictLength, blockLength);

buffer = ArrayUtil.growNoCopy(buffer, dictLength + blockLength);
bytes.length = 0;
// Read the dictionary
if (LZ4.decompress(in, dictLength, buffer, 0) != dictLength) {
throw new CorruptIndexException("Illegal dict length", in);
if (reused) {
assert buffer.length >= dictLength + blockLength;
in.skipBytes(compressedLengths[0]);
} else {
// Read the dictionary
buffer = ArrayUtil.growNoCopy(buffer, dictLength + blockLength);
if (LZ4.decompress(in, dictLength, buffer, 0) != dictLength) {
throw new CorruptIndexException("Illegal dict length", in);
}
reused = true;
Comment on lines +102 to +111
Copy link
Contributor

@jainankitk jainankitk Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should consider exposing metric (simple counter maybe) on how many times we could reuse, and how many times had to read from the disk? That would provide some useful insights on the usefulness of this change

}

int offsetInBlock = dictLength;
Expand All @@ -114,7 +119,7 @@ public void decompress(DataInput in, int originalLength, int offset, int length,
// Skip unneeded blocks
int numBytesToSkip = 0;
for (int i = 0; i < numBlocks && offsetInBlock + blockLength < offset; ++i) {
int compressedBlockLength = compressedLengths[i];
int compressedBlockLength = compressedLengths[i + 1];
numBytesToSkip += compressedBlockLength;
offsetInBlock += blockLength;
offsetInBytesRef -= blockLength;
Expand Down Expand Up @@ -148,6 +153,11 @@ public void decompress(DataInput in, int originalLength, int offset, int length,
public Decompressor clone() {
return new LZ4WithPresetDictDecompressor();
}

@Override
public void reset() {
reused = false;
}
}

private static class LZ4WithPresetDictCompressor extends Compressor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ private void doReset(int docID) throws IOException {
bytes.offset = bytes.length = 0;
for (int decompressed = 0; decompressed < totalLength; ) {
final int toDecompress = Math.min(totalLength - decompressed, chunkSize);
decompressor.reset();
decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, spare);
Comment on lines +514 to 515
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if reset should be the default behavior. We can pass another flag to indicate reuse if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that reset is essential. When the block changes, we must discard the cache in time, this operation can only be detected from external.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the block changes, we must discard the cache in time, this operation can only be detected from external.

I am not questioning that. My point is to not have reset method in the Decompressor interface, and add another decompress method that takes reuseIfPossible as one of the parameters. It ensures the functional correctness even if we don't make the reset call from somewhere in the code. And, allows explicit optimization wherever we deem appropriate. The risk in not explicitly making the reset call is much more than using original decompress without the reuse.

public abstract class Decompressor implements Cloneable {

  protected Decompressor() {}

  public void decompress(
      DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException {
    decompress(in, originalLength, offset, length, bytes, false);
  }

  public abstract void decompress(
      DataInput in, int originalLength, int offset, int length, BytesRef bytes, boolean reuseIfPossible) throws IOException;

  @Override
  public abstract Decompressor clone();
}

Copy link
Contributor Author

@kkewwei kkewwei Apr 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried but failed in just relying on outer reuseIfPossible to decide whether to cache PreSet Dict , In the follow case, outer must call the reset to clear the cache.

We have two chunks:

  1. chunk0 [doc0(length>0)]
  2. chunk1[doc0(length=0), doc1(length=1)]

Steps are as follow:

  1. Reading the chunk0/doc0, reuseIfPossible=false
  2. Reading the chunk1/doc0, reuseIfPossible=false. As length is 0, lucene will not read the PreSet Dict, the PreSet Dict is not cached.
  3. Reading the chunk1/doc1. In the case, doc1 is in the current chunk1, reuseIfPossible=true, but the PreSet Dict is not cached for now, lucene will throw exception.

In the case, we should call reset in the step1.

bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + spare.length);
System.arraycopy(spare.bytes, spare.offset, bytes.bytes, bytes.length, spare.length);
Expand Down Expand Up @@ -559,6 +560,7 @@ SerializedDocument document(int docID) throws IOException {
documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset + offset, length);
} else if (sliced) {
fieldsStream.seek(startPointer);
decompressor.reset();
decompressor.decompress(
fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);
documentInput =
Expand All @@ -572,6 +574,7 @@ void fillBuffer() throws IOException {
throw new EOFException();
}
final int toDecompress = Math.min(length - decompressed, chunkSize);
decompressor.reset();
decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, bytes);
decompressed += toDecompress;
}
Expand Down Expand Up @@ -643,6 +646,7 @@ SerializedDocument serializedDocument(int docID) throws IOException {
if (state.contains(docID) == false) {
fieldsStream.seek(indexReader.getStartPointer(docID));
state.reset(docID);
decompressor.reset();
}
assert state.contains(docID);
return state.document(docID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ public Fields get(int doc) throws IOException {
startPointer = blockState.startPointer; // avoid searching the start pointer
} else {
startPointer = indexReader.getStartPointer(doc);
decompressor.reset();
}
vectorsStream.seek(startPointer);

Expand Down
Loading