Skip to content

Large insert into sorted Iceberg table fails #24376

@agoyal-sfdc

Description

@agoyal-sfdc

For large inserts (in the order of 10s-100s billion rows), we see a couple of different issues

Failure due to S3 throttling while writing buffer files

S3 throttles Trino when it performs PUT/GET/DELETE operations on the buffer files used for sorting. This leads to long running queries which can fail at any time.

For normal data files, the solution for this is to use the write.object-storage.enabled table property. This adds a random hash to the prefix, which allows S3 to better distribute resources. Until recently (and in 435 which I am using), this had to be enabled using a system procedure and Trino, during writes, would create ObjectStoreLocationProvider instead of DefaultLocationProvider.

return new ObjectStoreLocationProvider(tableLocation, storageProperties);

(This can now be set at the Trino level too, with #20555, but that's unrelated)

This table property does not get used for buffer files, while sorting, however - because the random hash prefix is before trino-tmp-files.

The Location for trino-tmp-files (where buffer files are created) is instantiated here:

this.tempDirectory = Location.of(locationProvider.newDataLocation("trino-tmp-files"));

A hash is computed, but for trino-tmp-files putting the hash before it, here:

String hash = computeHash(filename);
if (context != null) {
return "%s/%s/%s/%s".formatted(storageLocation, hash, context, filename);
}
return "%s/%s/%s".formatted(storageLocation, hash, filename);

And then buffer files simply append to this path here:

Location tempFilePrefix = tempDirectory.appendPath(tempName);

Potential solutions:

  1. Update the logic such that the random hash is after trino-tmp-files instead of before, when the write.object-storage.enabled table property is enabled
  2. Create separate logic for buffer files vs data files, and introduce a new config specific to the buffer files.

On first glance option 2 seems unnecessary, but there is some value to controlling these independently. We use S3 lifecycle rules to expire objects using prefixes (which contain partition information) - if these are coupled (as in option 1), the random hash prefix would also be put before the partition prefixes, and expiring objects like this would no longer be possible. It is possible to define lifecycle rules based on object tags, but tagging objects is not supported in Trino afaik.

Queries taking very long/timing out due to S3 I/O

This is not quite a failure in the literal sense since we can adjust the query timeout, but an insert running for several (12-13+) hours is not ideal, especially if it's largely due to slow S3 I/O.

We can introduce a config in trino-iceberg, similar to trino-hive's hive.temporary-staging-directory-enabled (and hive.temporary-staging-directory-path):

@Config("hive.temporary-staging-directory-enabled")

This simply lets Trino use storage local to the Trino worker for these buffer files, completely circumventing S3 and avoiding the I/O bottleneck.

Both these changes can be added, and controlled with configs, to allow a user to choose which option works better for them.

I intend to submit a PR myself to address these, but wanted to start the conversation now and discuss options/solutions.

Metadata

Metadata

Assignees

No one assigned

    Labels

    icebergIceberg connector

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions