Skip to content

Commit 03ed2e4

Browse files
committed
Allow skipping of data deletion in expire_snapshots
1 parent 5213e22 commit 03ed2e4

File tree

5 files changed

+55
-4
lines changed

5 files changed

+55
-4
lines changed

docs/src/main/sphinx/connector/iceberg.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -884,7 +884,7 @@ with the `retention_threshold` parameter.
884884
`expire_snapshots` can be run as follows:
885885

886886
```sql
887-
ALTER TABLE test_table EXECUTE expire_snapshots(retention_threshold => '7d');
887+
ALTER TABLE test_table EXECUTE expire_snapshots(retention_threshold => '7d', delete_files => true);
888888
```
889889

890890
The value for `retention_threshold` must be higher than or equal to
@@ -893,6 +893,10 @@ procedure fails with a similar message: `Retention specified (1.00d) is shorter
893893
than the minimum retention configured in the system (7.00d)`. The default value
894894
for this property is `7d`.
895895

896+
The value for `delete_files` can be `true` or `false`. When set to `false`
897+
files associated with the expired snapshots will NOT be deleted. The default
898+
value for this property is `true`.
899+
896900
(iceberg-remove-orphan-files)=
897901
##### remove_orphan_files
898902

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ public class IcebergMetadata
417417
private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 2;
418418
private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2;
419419
private static final String RETENTION_THRESHOLD = "retention_threshold";
420+
private static final String DELETE_FILES = "delete_files";
420421
private static final String UNKNOWN_SNAPSHOT_TOKEN = "UNKNOWN";
421422
public static final Set<String> UPDATABLE_TABLE_PROPERTIES = ImmutableSet.<String>builder()
422423
.add(EXTRA_PROPERTIES_PROPERTY)
@@ -1722,12 +1723,13 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForDropExtendedStats
17221723
private Optional<ConnectorTableExecuteHandle> getTableHandleForExpireSnapshots(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
17231724
{
17241725
Duration retentionThreshold = (Duration) executeProperties.get(RETENTION_THRESHOLD);
1726+
boolean deleteFiles = (boolean) executeProperties.get(DELETE_FILES);
17251727
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());
17261728

17271729
return Optional.of(new IcebergTableExecuteHandle(
17281730
tableHandle.getSchemaTableName(),
17291731
EXPIRE_SNAPSHOTS,
1730-
new IcebergExpireSnapshotsHandle(retentionThreshold),
1732+
new IcebergExpireSnapshotsHandle(retentionThreshold, deleteFiles),
17311733
icebergTable.location(),
17321734
icebergTable.io().properties()));
17331735
}
@@ -2184,6 +2186,7 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut
21842186

21852187
// ForwardingFileIo handles bulk operations so no separate function implementation is needed
21862188
table.expireSnapshots()
2189+
.cleanExpiredFiles(expireSnapshotsHandle.deleteFiles())
21872190
.expireOlderThan(session.getStart().toEpochMilli() - retention.toMillis())
21882191
.planWith(icebergScanExecutor)
21892192
.commit();

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/ExpireSnapshotsTableProcedure.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
2222
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS;
2323
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
24+
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
2425

2526
public class ExpireSnapshotsTableProcedure
2627
implements Provider<TableProcedureMetadata>
@@ -36,6 +37,11 @@ public TableProcedureMetadata get()
3637
"retention_threshold",
3738
"Only snapshots older than threshold should be removed",
3839
Duration.valueOf("7d"),
40+
false),
41+
booleanProperty(
42+
"delete_files",
43+
"Delete underlying files associated to the expired snapshot(s)",
44+
true,
3945
false)));
4046
}
4147
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergExpireSnapshotsHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import static java.util.Objects.requireNonNull;
1919

20-
public record IcebergExpireSnapshotsHandle(Duration retentionThreshold)
20+
public record IcebergExpireSnapshotsHandle(Duration retentionThreshold, boolean deleteFiles)
2121
implements IcebergProcedureHandle
2222
{
2323
public IcebergExpireSnapshotsHandle

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6551,6 +6551,44 @@ public void testExpireSnapshotsPartitionedTable()
65516551
assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size());
65526552
}
65536553

6554+
@Test
6555+
public void testExpireSnapshotsKeepFiles()
6556+
throws Exception
6557+
{
6558+
try (TestTable table = newTrinoTable("test_expiring_snapshots_", "(key varchar, value integer)")) {
6559+
Session sessionWithShortRetentionUnlocked = prepareCleanUpSession();
6560+
assertUpdate("INSERT INTO " + table.getName() + " VALUES ('one', 1)", 1);
6561+
assertUpdate("INSERT INTO " + table.getName() + " VALUES ('two', 2)", 1);
6562+
List<String> initialFiles = getAllDataFilesFromTableDirectory(table.getName());
6563+
6564+
assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + table.getName() + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s', delete_files => false)");
6565+
6566+
List<String> updatedDataFiles = getAllDataFilesFromTableDirectory(table.getName());
6567+
assertThat(updatedDataFiles).containsExactlyInAnyOrderElementsOf(initialFiles);
6568+
assertThat(query("SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + table.getName()))
6569+
.matches("VALUES (BIGINT '3', VARCHAR 'one two')");
6570+
}
6571+
}
6572+
6573+
@Test
6574+
public void testExpireSnapshotsPartitionedTableKeepFiles()
6575+
throws Exception
6576+
{
6577+
try (TestTable table = newTrinoTable("test_expiring_snapshots_partitioned_table", "(col1 BIGINT, col2 BIGINT) WITH (partitioning = ARRAY['col1'])")) {
6578+
Session sessionWithShortRetentionUnlocked = prepareCleanUpSession();
6579+
assertUpdate("INSERT INTO " + table.getName() + " VALUES(1, 100), (1, 101), (1, 102), (2, 200), (2, 201), (3, 300)", 6);
6580+
assertUpdate("DELETE FROM " + table.getName() + " WHERE col1 = 1", 3);
6581+
assertUpdate("INSERT INTO " + table.getName() + " VALUES(4, 400)", 1);
6582+
List<String> initialFiles = getAllDataFilesFromTableDirectory(table.getName());
6583+
6584+
assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + table.getName() + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s', delete_files => false)");
6585+
6586+
List<String> updatedDataFiles = getAllDataFilesFromTableDirectory(table.getName());
6587+
assertThat(updatedDataFiles).containsExactlyInAnyOrderElementsOf(initialFiles);
6588+
assertQuery("SELECT sum(col2) FROM " + table.getName(), "SELECT 1101");
6589+
}
6590+
}
6591+
65546592
@Test
65556593
public void testExpireSnapshotsOnSnapshot()
65566594
{
@@ -6585,7 +6623,7 @@ public void testExplainExpireSnapshotOutput()
65856623
assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1);
65866624

65876625
assertExplain("EXPLAIN ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')",
6588-
"SimpleTableExecute\\[table = iceberg:schemaTableName:tpch.test_expiring_snapshots.*\\[retentionThreshold=0\\.00s].*");
6626+
"SimpleTableExecute\\[table = iceberg:schemaTableName:tpch.test_expiring_snapshots.*\\[retentionThreshold=0\\.00s, deleteFiles=true].*");
65896627
}
65906628

65916629
@Test

0 commit comments

Comments
 (0)