Skip to content

Commit fa4285e

Browse files
committed
Allow skipping of data deletion in expire_snapshots
1 parent e933971 commit fa4285e

File tree

4 files changed

+49
-2
lines changed

4 files changed

+49
-2
lines changed

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ public class IcebergMetadata
416416
private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 2;
417417
private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2;
418418
private static final String RETENTION_THRESHOLD = "retention_threshold";
419+
private static final String DELETE_FILES = "delete_files";
419420
private static final String UNKNOWN_SNAPSHOT_TOKEN = "UNKNOWN";
420421
public static final Set<String> UPDATABLE_TABLE_PROPERTIES = ImmutableSet.<String>builder()
421422
.add(EXTRA_PROPERTIES_PROPERTY)
@@ -1721,12 +1722,13 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForDropExtendedStats
17211722
private Optional<ConnectorTableExecuteHandle> getTableHandleForExpireSnapshots(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
17221723
{
17231724
Duration retentionThreshold = (Duration) executeProperties.get(RETENTION_THRESHOLD);
1725+
boolean deleteFiles = (boolean) executeProperties.get(DELETE_FILES);
17241726
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());
17251727

17261728
return Optional.of(new IcebergTableExecuteHandle(
17271729
tableHandle.getSchemaTableName(),
17281730
EXPIRE_SNAPSHOTS,
1729-
new IcebergExpireSnapshotsHandle(retentionThreshold),
1731+
new IcebergExpireSnapshotsHandle(retentionThreshold, deleteFiles),
17301732
icebergTable.location(),
17311733
icebergTable.io().properties()));
17321734
}
@@ -2198,6 +2200,7 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut
21982200
table.expireSnapshots()
21992201
.expireOlderThan(expireTimestampMillis)
22002202
.deleteWith(deleteFunction)
2203+
.cleanExpiredFiles(expireSnapshotsHandle.deleteFiles())
22012204
.commit();
22022205

22032206
fileSystem.deleteFiles(pathsToDelete);

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/ExpireSnapshotsTableProcedure.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
2222
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS;
2323
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
24+
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
2425

2526
public class ExpireSnapshotsTableProcedure
2627
implements Provider<TableProcedureMetadata>
@@ -36,6 +37,11 @@ public TableProcedureMetadata get()
3637
"retention_threshold",
3738
"Only snapshots older than threshold should be removed",
3839
Duration.valueOf("7d"),
40+
false),
41+
booleanProperty(
42+
"delete_files",
43+
"Expire snapshots without deleting the underlying data",
44+
true,
3945
false)));
4046
}
4147
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergExpireSnapshotsHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import static java.util.Objects.requireNonNull;
1919

20-
public record IcebergExpireSnapshotsHandle(Duration retentionThreshold)
20+
public record IcebergExpireSnapshotsHandle(Duration retentionThreshold, boolean deleteFiles)
2121
implements IcebergProcedureHandle
2222
{
2323
public IcebergExpireSnapshotsHandle

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6550,6 +6550,44 @@ public void testExpireSnapshotsPartitionedTable()
65506550
assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size());
65516551
}
65526552

6553+
@Test
6554+
public void testExpireSnapshotsKeepFiles()
6555+
throws Exception
6556+
{
6557+
String tableName = "test_expiring_snapshots_" + randomNameSuffix();
6558+
Session sessionWithShortRetentionUnlocked = prepareCleanUpSession();
6559+
assertUpdate("CREATE TABLE " + tableName + " (key varchar, value integer)");
6560+
assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 1)", 1);
6561+
assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1);
6562+
assertThat(query("SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName))
6563+
.matches("VALUES (BIGINT '3', VARCHAR 'one two')");
6564+
List<String> initialFiles = getAllDataFilesFromTableDirectory(tableName);
6565+
6566+
assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s', delete_files => false)");
6567+
6568+
List<String> updatedDataFiles = getAllDataFilesFromTableDirectory(tableName);
6569+
assertThat(updatedDataFiles).hasSize(initialFiles.size());
6570+
}
6571+
6572+
@Test
6573+
public void testExpireSnapshotsPartitionedTableKeepFiles()
6574+
throws Exception
6575+
{
6576+
String tableName = "test_expiring_snapshots_partitioned_table" + randomNameSuffix();
6577+
Session sessionWithShortRetentionUnlocked = prepareCleanUpSession();
6578+
assertUpdate("CREATE TABLE " + tableName + " (col1 BIGINT, col2 BIGINT) WITH (partitioning = ARRAY['col1'])");
6579+
assertUpdate("INSERT INTO " + tableName + " VALUES(1, 100), (1, 101), (1, 102), (2, 200), (2, 201), (3, 300)", 6);
6580+
assertUpdate("DELETE FROM " + tableName + " WHERE col1 = 1", 3);
6581+
assertUpdate("INSERT INTO " + tableName + " VALUES(4, 400)", 1);
6582+
assertQuery("SELECT sum(col2) FROM " + tableName, "SELECT 1101");
6583+
List<String> initialFiles = getAllDataFilesFromTableDirectory(tableName);
6584+
6585+
assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s', delete_files => false)");
6586+
6587+
List<String> updatedDataFiles = getAllDataFilesFromTableDirectory(tableName);
6588+
assertThat(updatedDataFiles).hasSize(initialFiles.size());
6589+
}
6590+
65536591
@Test
65546592
public void testExpireSnapshotsOnSnapshot()
65556593
{

0 commit comments

Comments
 (0)