Skip to content

Commit ae9e745

Browse files
committed
Introduce DeltaLakeFileSystemFactory and VendedCredentialsProvider in Delta Lake
1 parent af38a3c commit ae9e745

File tree

59 files changed

+761
-331
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+761
-331
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323
import io.airlift.slice.Slice;
2424
import io.trino.filesystem.Location;
2525
import io.trino.filesystem.TrinoFileSystem;
26-
import io.trino.filesystem.TrinoFileSystemFactory;
2726
import io.trino.metastore.Partitions;
2827
import io.trino.parquet.writer.ParquetWriterOptions;
2928
import io.trino.plugin.deltalake.DataFileInfo.DataFileType;
29+
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
3030
import io.trino.plugin.deltalake.util.DeltaLakeWriteUtils;
3131
import io.trino.plugin.hive.parquet.ParquetFileWriter;
3232
import io.trino.spi.Page;
@@ -118,10 +118,11 @@ public AbstractDeltaLakePageSink(
118118
List<DeltaLakeColumnHandle> inputColumns,
119119
List<String> originalPartitionColumns,
120120
PageIndexerFactory pageIndexerFactory,
121-
TrinoFileSystemFactory fileSystemFactory,
121+
DeltaLakeFileSystemFactory fileSystemFactory,
122122
int maxOpenWriters,
123123
JsonCodec<DataFileInfo> dataFileInfoCodec,
124124
Location tableLocation,
125+
VendedCredentialsHandle credentialsHandle,
125126
Location outputPathDirectory,
126127
ConnectorSession session,
127128
DeltaLakeWriterStats stats,
@@ -133,7 +134,7 @@ public AbstractDeltaLakePageSink(
133134

134135
requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
135136

136-
this.fileSystem = requireNonNull(fileSystemFactory, "fileSystemFactory is null").create(session);
137+
this.fileSystem = requireNonNull(fileSystemFactory, "fileSystemFactory is null").create(session, credentialsHandle);
137138
this.maxOpenWriters = maxOpenWriters;
138139
this.dataFileInfoCodec = requireNonNull(dataFileInfoCodec, "dataFileInfoCodec is null");
139140
this.parquetSchemaMapping = requireNonNull(parquetSchemaMapping, "parquetSchemaMapping is null");

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/BaseTransactionsTable.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import com.google.common.collect.ImmutableList;
1717
import io.airlift.units.DataSize;
1818
import io.trino.filesystem.TrinoFileSystem;
19-
import io.trino.filesystem.TrinoFileSystemFactory;
2019
import io.trino.plugin.deltalake.metastore.DeltaMetastoreTable;
2120
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
2221
import io.trino.plugin.deltalake.transactionlog.Transaction;
@@ -53,13 +52,13 @@ public abstract class BaseTransactionsTable
5352
implements SystemTable
5453
{
5554
private final DeltaMetastoreTable table;
56-
private final TrinoFileSystemFactory fileSystemFactory;
55+
private final DeltaLakeFileSystemFactory fileSystemFactory;
5756
private final TransactionLogAccess transactionLogAccess;
5857
private final ConnectorTableMetadata tableMetadata;
5958

6059
public BaseTransactionsTable(
6160
DeltaMetastoreTable table,
62-
TrinoFileSystemFactory fileSystemFactory,
61+
DeltaLakeFileSystemFactory fileSystemFactory,
6362
TransactionLogAccess transactionLogAccess,
6463
TypeManager typeManager,
6564
ConnectorTableMetadata tableMetadata)
@@ -86,12 +85,13 @@ public ConnectorTableMetadata getTableMetadata()
8685
@Override
8786
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
8887
{
88+
TrinoFileSystem fileSystem = fileSystemFactory.create(session, table);
8989
long snapshotVersion;
9090
try {
9191
// Verify the transaction log is readable
9292
TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(session, table, Optional.empty());
9393
snapshotVersion = tableSnapshot.getVersion();
94-
transactionLogAccess.getMetadataEntry(session, tableSnapshot);
94+
transactionLogAccess.getMetadataEntry(session, fileSystem, tableSnapshot);
9595
}
9696
catch (IOException e) {
9797
throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Unable to load table metadata from location: " + table.location(), e);
@@ -137,7 +137,6 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
137137
endVersionInclusive = Optional.of(snapshotVersion);
138138
}
139139

140-
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
141140
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
142141
try {
143142
List<Transaction> transactions = loadNewTailBackward(fileSystem, table.location(), startVersionExclusive, endVersionInclusive.get()).reversed();

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/CorruptedDeltaLakeTableHandle.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,21 @@
1313
*/
1414
package io.trino.plugin.deltalake;
1515

16+
import io.trino.plugin.deltalake.metastore.VendedCredentials;
17+
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
1618
import io.trino.spi.TrinoException;
1719
import io.trino.spi.connector.SchemaTableName;
1820

21+
import java.util.Optional;
22+
1923
import static java.util.Objects.requireNonNull;
2024

2125
public record CorruptedDeltaLakeTableHandle(
2226
SchemaTableName schemaTableName,
27+
boolean catalogOwned,
2328
boolean managed,
2429
String location,
30+
Optional<VendedCredentials> vendedCredentials,
2531
TrinoException originalException)
2632
implements LocatedTableHandle
2733
{
@@ -37,4 +43,10 @@ public TrinoException createException()
3743
// Original exception originates from a different place. Create a new exception not to confuse reader with a stacktrace not matching call site.
3844
return new TrinoException(originalException::getErrorCode, originalException.getMessage(), originalException);
3945
}
46+
47+
@Override
48+
public VendedCredentialsHandle toCredentialsHandle()
49+
{
50+
return new VendedCredentialsHandle(catalogOwned, managed, location, vendedCredentials);
51+
}
4052
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.deltalake;
15+
16+
import com.google.inject.Inject;
17+
import io.trino.filesystem.TrinoFileSystem;
18+
import io.trino.filesystem.TrinoFileSystemFactory;
19+
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
20+
import io.trino.spi.connector.ConnectorSession;
21+
import io.trino.spi.security.ConnectorIdentity;
22+
23+
import static java.util.Objects.requireNonNull;
24+
25+
public class DefaultDeltaLakeFileSystemFactory
26+
implements DeltaLakeFileSystemFactory
27+
{
28+
private final TrinoFileSystemFactory fileSystemFactory;
29+
30+
@Inject
31+
public DefaultDeltaLakeFileSystemFactory(TrinoFileSystemFactory fileSystemFactory)
32+
{
33+
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
34+
}
35+
36+
@Override
37+
public TrinoFileSystem create(ConnectorSession session, VendedCredentialsHandle table)
38+
{
39+
return fileSystemFactory.create(session.getIdentity());
40+
}
41+
42+
@Override
43+
public TrinoFileSystem create(ConnectorSession session, String tableLocation)
44+
{
45+
return fileSystemFactory.create(session.getIdentity());
46+
}
47+
48+
@Override
49+
public TrinoFileSystem create(ConnectorIdentity identity)
50+
{
51+
throw new UnsupportedOperationException();
52+
}
53+
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeCdfPageSink.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import io.airlift.json.JsonCodec;
1717
import io.trino.filesystem.Location;
18-
import io.trino.filesystem.TrinoFileSystemFactory;
18+
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
1919
import io.trino.spi.PageIndexerFactory;
2020
import io.trino.spi.connector.ConnectorSession;
2121
import io.trino.spi.type.TypeOperators;
@@ -35,10 +35,11 @@ public DeltaLakeCdfPageSink(
3535
List<DeltaLakeColumnHandle> inputColumns,
3636
List<String> originalPartitionColumns,
3737
PageIndexerFactory pageIndexerFactory,
38-
TrinoFileSystemFactory fileSystemFactory,
38+
DeltaLakeFileSystemFactory fileSystemFactory,
3939
int maxOpenWriters,
4040
JsonCodec<DataFileInfo> dataFileInfoCodec,
4141
Location tableLocation,
42+
VendedCredentialsHandle credentialsHandle,
4243
Location outputPath,
4344
ConnectorSession session,
4445
DeltaLakeWriterStats stats,
@@ -54,6 +55,7 @@ public DeltaLakeCdfPageSink(
5455
maxOpenWriters,
5556
dataFileInfoCodec,
5657
tableLocation,
58+
credentialsHandle,
5759
outputPath,
5860
session,
5961
stats,
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.deltalake;
15+
16+
import io.trino.filesystem.TrinoFileSystem;
17+
import io.trino.filesystem.TrinoFileSystemFactory;
18+
import io.trino.plugin.deltalake.metastore.DeltaMetastoreTable;
19+
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
20+
import io.trino.spi.connector.ConnectorSession;
21+
22+
public interface DeltaLakeFileSystemFactory
23+
extends TrinoFileSystemFactory
24+
{
25+
default TrinoFileSystem create(ConnectorSession session, DeltaLakeTableHandle table)
26+
{
27+
return create(session, table.toCredentialsHandle());
28+
}
29+
30+
default TrinoFileSystem create(ConnectorSession session, DeltaMetastoreTable table)
31+
{
32+
return create(session, new VendedCredentialsHandle(table.catalogOwned(), table.managed(), table.location(), table.vendedCredentials()));
33+
}
34+
35+
TrinoFileSystem create(ConnectorSession session, VendedCredentialsHandle table);
36+
37+
/**
38+
* For external table create/write using location
39+
*/
40+
TrinoFileSystem create(ConnectorSession session, String tableLocation);
41+
42+
@Override
43+
default TrinoFileSystem create(ConnectorSession session)
44+
{
45+
throw new UnsupportedOperationException();
46+
}
47+
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import com.google.common.collect.ImmutableList;
1717
import io.trino.filesystem.TrinoFileSystem;
18-
import io.trino.filesystem.TrinoFileSystemFactory;
1918
import io.trino.plugin.deltalake.metastore.DeltaMetastoreTable;
2019
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
2120
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
@@ -46,7 +45,7 @@ public class DeltaLakeHistoryTable
4645
{
4746
public DeltaLakeHistoryTable(
4847
DeltaMetastoreTable table,
49-
TrinoFileSystemFactory fileSystemFactory,
48+
DeltaLakeFileSystemFactory fileSystemFactory,
5049
TransactionLogAccess transactionLogAccess,
5150
TypeManager typeManager)
5251
{

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeInsertTableHandle.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.trino.plugin.deltalake;
1515

1616
import com.google.common.collect.ImmutableList;
17+
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
1718
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
1819
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
1920
import io.trino.spi.connector.ConnectorInsertTableHandle;
@@ -30,7 +31,8 @@ public record DeltaLakeInsertTableHandle(
3031
ProtocolEntry protocolEntry,
3132
List<DeltaLakeColumnHandle> inputColumns,
3233
long readVersion,
33-
boolean retriesEnabled)
34+
boolean retriesEnabled,
35+
VendedCredentialsHandle credentialsHandle)
3436
implements ConnectorInsertTableHandle
3537
{
3638
public DeltaLakeInsertTableHandle
@@ -40,6 +42,7 @@ public record DeltaLakeInsertTableHandle(
4042
requireNonNull(protocolEntry, "protocolEntry is null");
4143
inputColumns = ImmutableList.copyOf(requireNonNull(inputColumns, "inputColumns is null"));
4244
requireNonNull(location, "location is null");
45+
requireNonNull(credentialsHandle, "credentialsHandle is null");
4346
}
4447

4548
@Override

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.airlift.slice.Slices;
2222
import io.trino.filesystem.Location;
2323
import io.trino.filesystem.TrinoFileSystem;
24-
import io.trino.filesystem.TrinoFileSystemFactory;
2524
import io.trino.filesystem.TrinoInputFile;
2625
import io.trino.parquet.ParquetDataSource;
2726
import io.trino.parquet.ParquetReaderOptions;
@@ -31,6 +30,7 @@
3130
import io.trino.parquet.writer.ParquetWriterOptions;
3231
import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
3332
import io.trino.plugin.deltalake.delete.RoaringBitmapArray;
33+
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
3434
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
3535
import io.trino.plugin.hive.parquet.ParquetFileWriter;
3636
import io.trino.plugin.hive.parquet.ParquetPageSourceFactory;
@@ -139,14 +139,15 @@ public class DeltaLakeMergeSink
139139

140140
public DeltaLakeMergeSink(
141141
TypeOperators typeOperators,
142-
TrinoFileSystemFactory fileSystemFactory,
142+
DeltaLakeFileSystemFactory fileSystemFactory,
143143
ConnectorSession session,
144144
DateTimeZone parquetDateTimeZone,
145145
String trinoVersion,
146146
JsonCodec<DataFileInfo> dataFileInfoCodec,
147147
JsonCodec<DeltaLakeMergeResult> mergeResultJsonCodec,
148148
DeltaLakeWriterStats writerStats,
149149
Location rootTableLocation,
150+
VendedCredentialsHandle credentialsHandle,
150151
ConnectorPageSink insertPageSink,
151152
List<DeltaLakeColumnHandle> tableColumns,
152153
int domainCompactionThreshold,
@@ -162,7 +163,7 @@ public DeltaLakeMergeSink(
162163
{
163164
this.typeOperators = requireNonNull(typeOperators, "typeOperators is null");
164165
this.session = requireNonNull(session, "session is null");
165-
this.fileSystem = fileSystemFactory.create(session);
166+
this.fileSystem = fileSystemFactory.create(session, credentialsHandle);
166167
this.parquetDateTimeZone = requireNonNull(parquetDateTimeZone, "parquetDateTimeZone is null");
167168
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
168169
this.dataFileInfoCodec = requireNonNull(dataFileInfoCodec, "dataFileInfoCodec is null");

0 commit comments

Comments
 (0)