Skip to content

[HUDI-9359] add TIMESTAMP_MILLIS and TIME_MILLIS to InternalSchema #13291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,10 @@ private static DataType constructSparkSchemaFromType(Type type) {
case DATE:
return DateType$.MODULE$;
case TIME:
case TIME_MILLIS:
throw new UnsupportedOperationException(String.format("cannot convert %s type to Spark", type));
case TIMESTAMP:
case TIMESTAMP_MILLIS:
// todo support TimeStampNTZ
return TimestampType$.MODULE$;
case STRING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ object HoodieDataTypeUtils {
StructType.fromString(jsonSchema)

def canUseRowWriter(schema: Schema, conf: Configuration): Boolean = {
if (conf.getBoolean(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, true)) {
if (HoodieAvroUtils.hasTimestampMillisField(schema)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a UT or integration test for Spark already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested HoodieAvroUtils.hasTimestampMillisField but not canUseRowWriter. Tbh I'm not sure if I should just get rid of it. I think there is a setting in spark that allows using timestamp millis, so I think maybe it should be checking if there are both millis and micros used at the same time. And also checking that config. But we also have a hudi config that sets that config. IDK how much value all the time spent to validate all that will add. So maybe for now we just remove it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are both millis and micros used at the same time. And also checking that config. But we also have a hudi config that sets that config

The default timestamp precision of Spark is 6, are you saying user specify the timestamp precision as explicit 3 for some columns? I guess most of the cases would just use either default precision 6 or 3, the mixed case should be very rare.

Is the patch to fix the schema evolution use case for timestamp(3)?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix is for this issue:
#13233

Where Hudi streamers always force Timestamp into micros no matter what the user specifies at the output schema in the case of a new table. As you can see in the internal converter, no matter what version of timestamp is used in the output schema (millis or micros), you'll always end up with micros.

The OR clause here makes that clear:
https://github.com/apache/hudi/pull/13291/files#diff-2d823101c425b4f9fbc444d1def5b6ebe1607bf19b532c80f5b0851cfd27a292

And is reproducible in the script in the linked issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, then I think we should support timestamp(3) for Spark, @jonvex do you think it is feasible?

false
} else if (conf.getBoolean(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, true)) {
// if we can write lists with the old list structure, we can use row writer regardless of decimal precision
true
} else if (!HoodieAvroUtils.hasSmallPrecisionDecimalField(schema)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,33 @@ public static boolean hasListOrMapField(Schema schema) {
}
}

/**
* Checks whether the provided schema contains a timestamp millis field
* @param schema input
* @return true if a timestamp millis field is present, false otherwise
*/
public static boolean hasTimestampMillisField(Schema schema) {
switch (schema.getType()) {
case RECORD:
for (Field field : schema.getFields()) {
if (hasTimestampMillisField(field.schema())) {
return true;
}
}
return false;
case ARRAY:
return hasTimestampMillisField(schema.getElementType());
case MAP:
return hasTimestampMillisField(schema.getValueType());
case UNION:
return hasTimestampMillisField(getActualSchemaFromUnion(schema, null));
case LONG:
return LogicalTypes.timestampMillis().equals(schema.getLogicalType());
default:
return false;
}
}

/**
* Avro does not support type promotion from numbers to string. This function returns true if
* it will be necessary to rewrite the record to support this promotion.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public class Comparators {
put(Types.DoubleType.get(), Comparator.naturalOrder());
put(Types.DateType.get(), Comparator.naturalOrder());
put(Types.TimeType.get(), Comparator.naturalOrder());
put(Types.TimeMillisType.get(), Comparator.naturalOrder());
put(Types.TimestampType.get(), Comparator.naturalOrder());
put(Types.TimestampMillisType.get(), Comparator.naturalOrder());
put(Types.StringType.get(), Comparator.naturalOrder());
put(Types.UUIDType.get(), Comparator.naturalOrder());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ enum TypeID {
DATE(Integer.class),
BOOLEAN(Boolean.class),
TIME(Long.class),
TIME_MILLIS(Integer.class),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the primitive type for the logical type of time-millis also use Long type?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like Integer type is aligned with Avro. Could you add a note here in a comment?

TIMESTAMP(Long.class),
TIMESTAMP_MILLIS(Long.class),
DECIMAL(BigDecimal.class),
UUID(UUID.class);
private final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,30 @@ public String toString() {
}
}

/**
* Time primitive type.
*/
public static class TimeMillisType extends PrimitiveType {
private static final TimeMillisType INSTANCE = new TimeMillisType();

public static TimeMillisType get() {
return INSTANCE;
}

private TimeMillisType() {
}

@Override
public TypeID typeId() {
return TypeID.TIME_MILLIS;
}

@Override
public String toString() {
return "time-millis";
}
}

/**
* Time primitive type.
*/
Expand All @@ -211,6 +235,30 @@ public String toString() {
}
}

/**
* Time primitive type.
*/
public static class TimestampMillisType extends PrimitiveType {
private static final TimestampMillisType INSTANCE = new TimestampMillisType();

public static TimestampMillisType get() {
return INSTANCE;
}

private TimestampMillisType() {
}

@Override
public TypeID typeId() {
return TypeID.TIMESTAMP_MILLIS;
}

@Override
public String toString() {
return "timestamp-millis";
}
}

/**
* String primitive type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,14 +345,13 @@ private static Type visitAvroPrimitiveToBuildInternalType(Schema primitive) {
} else if (logical instanceof LogicalTypes.Date) {
return Types.DateType.get();

} else if (
logical instanceof LogicalTypes.TimeMillis
|| logical instanceof LogicalTypes.TimeMicros) {
} else if (logical instanceof LogicalTypes.TimeMillis) {
return Types.TimeMillisType.get();
} else if (logical instanceof LogicalTypes.TimeMicros) {
return Types.TimeType.get();

} else if (
logical instanceof LogicalTypes.TimestampMillis
|| logical instanceof LogicalTypes.TimestampMicros) {
} else if (logical instanceof LogicalTypes.TimestampMillis) {
return Types.TimestampMillisType.get();
} else if (logical instanceof LogicalTypes.TimestampMicros) {
Comment on lines +348 to +354
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would type handling go through the schema evolution on read here? Supposedly the schema evolution on read logic should not be invoked by default. Is the logic being leaked to non-schema-on-read code path?

return Types.TimestampType.get();
} else if (LogicalTypes.uuid().getName().equals(name)) {
return Types.UUIDType.get();
Expand Down Expand Up @@ -542,9 +541,15 @@ private static Schema visitInternalPrimitiveToBuildAvroPrimitiveType(Type.Primit
case TIME:
return LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));

case TIME_MILLIS:
return LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT));

case TIMESTAMP:
return LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));

case TIMESTAMP_MILLIS:
return LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));

case STRING:
return Schema.create(Schema.Type.STRING);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,12 @@ private static Type parseTypeFromJson(JsonNode jsonNode) {
return Types.DateType.get();
case TIME:
return Types.TimeType.get();
case TIME_MILLIS:
return Types.TimeMillisType.get();
case TIMESTAMP:
return Types.TimestampType.get();
case TIMESTAMP_MILLIS:
return Types.TimestampMillisType.get();
case STRING:
return Types.StringType.get();
case UUID:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,47 @@ void testHasListOrMapField() {
assertFalse(HoodieAvroUtils.hasListOrMapField(new Schema.Parser().parse(EXAMPLE_SCHEMA)));
}

@Test
public void testHasTimestampMillisField() {
Schema longWithTimestampMillis = Schema.create(Schema.Type.LONG);
LogicalTypes.timestampMillis().addToSchema(longWithTimestampMillis);

Schema longWithTimestampMicros = Schema.create(Schema.Type.LONG);
LogicalTypes.timestampMicros().addToSchema(longWithTimestampMicros);

Schema plainLong = Schema.create(Schema.Type.LONG);
Schema plainString = Schema.create(Schema.Type.STRING);

// test simple types
assertTrue(HoodieAvroUtils.hasTimestampMillisField(longWithTimestampMillis));
assertFalse(HoodieAvroUtils.hasTimestampMillisField(longWithTimestampMicros));
assertFalse(HoodieAvroUtils.hasTimestampMillisField(plainLong));
assertFalse(HoodieAvroUtils.hasTimestampMillisField(plainString));

// test records
Schema recordWithTimestampMillis = Schema.createRecord("RecordWithTSMillis", null, null, false);
recordWithTimestampMillis.setFields(Arrays.asList(new Schema.Field("tsmicros", longWithTimestampMicros, null, null),
new Schema.Field("tsmillis", longWithTimestampMillis, null, null), new Schema.Field("longfield", plainLong, null, null),
new Schema.Field("stringfield", plainString, null, null)));
assertTrue(HoodieAvroUtils.hasTimestampMillisField(recordWithTimestampMillis));
Schema recordWithoutTimestampMillis = Schema.createRecord("RecordWithoutTSMillis", null, null, false);
recordWithoutTimestampMillis.setFields(Arrays.asList(new Schema.Field("stringfield", plainString, null, null),
new Schema.Field("tsmicros", longWithTimestampMicros, null, null), new Schema.Field("longfield", plainLong, null, null)));
assertFalse(HoodieAvroUtils.hasTimestampMillisField(recordWithoutTimestampMillis));

// test arrays
assertTrue(HoodieAvroUtils.hasTimestampMillisField(Schema.createArray(recordWithTimestampMillis)));
assertFalse(HoodieAvroUtils.hasTimestampMillisField(Schema.createArray(recordWithoutTimestampMillis)));

// test maps
assertTrue(HoodieAvroUtils.hasTimestampMillisField(Schema.createMap(recordWithTimestampMillis)));
assertFalse(HoodieAvroUtils.hasTimestampMillisField(Schema.createMap(recordWithoutTimestampMillis)));

// test unions
assertTrue(HoodieAvroUtils.hasTimestampMillisField(Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), recordWithTimestampMillis))));
assertFalse(HoodieAvroUtils.hasTimestampMillisField(Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), recordWithoutTimestampMillis))));
}

@Test
void testHasSmallPrecisionDecimalField() {
assertTrue(HoodieAvroUtils.hasSmallPrecisionDecimalField(new Schema.Parser().parse(SCHEMA_WITH_DECIMAL_FIELD)));
Expand Down
28 changes: 28 additions & 0 deletions hudi-common/src/test/resources/nullRight.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,34 @@
}
]
}
},
{
"name": "timeMicroField",
"type": {
"type": "long",
"logicalType": "time-micros"
}
},
{
"name": "timeMillisField",
"type": {
"type": "int",
"logicalType": "time-millis"
}
},
{
"name": "timestampMicrosField",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
},
{
"name": "timestampMillisField",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
28 changes: 28 additions & 0 deletions hudi-common/src/test/resources/nullWrong.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,34 @@
"null"
]
}
},
{
"name": "timeMicroField",
"type": {
"type": "long",
"logicalType": "time-micros"
}
},
{
"name": "timeMillisField",
"type": {
"type": "int",
"logicalType": "time-millis"
}
},
{
"name": "timestampMicrosField",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
},
{
"name": "timestampMillisField",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,15 @@ private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) {
case DOUBLE:
case DATE:
case TIMESTAMP:
case TIMESTAMP_MILLIS:
case STRING:
case UUID:
case FIXED:
case BINARY:
case DECIMAL:
return typeInfo;
case TIME:
case TIME_MILLIS:
throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", type));
default:
LOG.error("cannot convert unknown type: {} to Hive", type);
Expand Down
Loading