@@ -303,7 +303,7 @@ def __init__(
303
303
type_schema = make_avsc_object (atype , names )
304
304
except Exception as e :
305
305
raise SchemaParseException (
306
- f'Type property "{ atype } " not a valid Avro schema. '
306
+ f'Type property "{ atype } " not a valid Avro schema: { e } '
307
307
) from e
308
308
self .set_prop ("type" , type_schema )
309
309
self .set_prop ("name" , name )
@@ -409,8 +409,8 @@ def __init__(
409
409
items_schema = make_avsc_object (items , names )
410
410
except Exception as err :
411
411
raise SchemaParseException (
412
- f"Items schema ({ items } ) not a valid Avro schema: (known "
413
- f"names: { list (names .names .keys ())} )."
412
+ f"Items schema ({ items } ) not a valid Avro schema: { err } . "
413
+ f"Known names: { list (names .names .keys ())} )."
414
414
) from err
415
415
416
416
self .set_prop ("items" , items_schema )
@@ -451,7 +451,7 @@ def __init__(
451
451
new_schema = make_avsc_object (schema , names )
452
452
except Exception as err :
453
453
raise SchemaParseException (
454
- f"Union item must be a valid Avro schema: { schema } "
454
+ f"Union item must be a valid Avro schema: { err } ; { schema } , "
455
455
) from err
456
456
# check the new schema
457
457
if (
@@ -477,7 +477,7 @@ class RecordSchema(NamedSchema):
477
477
def make_field_objects (field_data : List [PropsType ], names : Names ) -> List [Field ]:
478
478
"""We're going to need to make message parameters too."""
479
479
field_objects = [] # type: List[Field]
480
- field_names = [] # type: List [str]
480
+ parsed_fields : Dict [str , PropsType ] = {}
481
481
for field in field_data :
482
482
if hasattr (field , "get" ) and callable (field .get ):
483
483
atype = field .get ("type" )
@@ -504,10 +504,15 @@ def make_field_objects(field_data: List[PropsType], names: Names) -> List[Field]
504
504
atype , name , has_default , default , order , names , doc , other_props
505
505
)
506
506
# make sure field name has not been used yet
507
- if new_field .name in field_names :
508
- fail_msg = f"Field name { new_field .name } already in use."
509
- raise SchemaParseException (fail_msg )
510
- field_names .append (new_field .name )
507
+ if new_field .name in parsed_fields :
508
+ old_field = parsed_fields [new_field .name ]
509
+ if not is_subtype (old_field ["type" ], field ["type" ]):
510
+ raise SchemaParseException (
511
+ f"Field name { new_field .name } already in use with "
512
+ "incompatible type. "
513
+ f"{ field ['type' ]} vs { old_field ['type' ]} ."
514
+ )
515
+ parsed_fields [new_field .name ] = field
511
516
else :
512
517
raise SchemaParseException (f"Not a valid field: { field } " )
513
518
field_objects .append (new_field )
@@ -655,3 +660,62 @@ def make_avsc_object(json_data: JsonDataType, names: Optional[Names] = None) ->
655
660
# not for us!
656
661
fail_msg = f"Could not make an Avro Schema object from { json_data } ."
657
662
raise SchemaParseException (fail_msg )
663
+
664
+
665
+ def is_subtype (existing : PropType , new : PropType ) -> bool :
666
+ """Checks if a new type specification is compatible with an existing type spec."""
667
+ if existing == new :
668
+ return True
669
+ if isinstance (existing , list ) and (new in existing ):
670
+ return True
671
+ if existing == "Any" :
672
+ if new is None or new == [] or new == ["null" ] or new == "null" :
673
+ return False
674
+ if isinstance (new , list ) and "null" in new :
675
+ return False
676
+ return True
677
+ if (
678
+ isinstance (existing , dict )
679
+ and "type" in existing
680
+ and existing ["type" ] == "array"
681
+ and isinstance (new , dict )
682
+ and "type" in new
683
+ and new ["type" ] == "array"
684
+ ):
685
+ return is_subtype (existing ["items" ], new ["items" ])
686
+ if (
687
+ isinstance (existing , dict )
688
+ and "type" in existing
689
+ and existing ["type" ] == "enum"
690
+ and isinstance (new , dict )
691
+ and "type" in new
692
+ and new ["type" ] == "enum"
693
+ ):
694
+ return is_subtype (existing ["symbols" ], new ["symbols" ])
695
+ if (
696
+ isinstance (existing , dict )
697
+ and "type" in existing
698
+ and existing ["type" ] == "record"
699
+ and isinstance (new , dict )
700
+ and "type" in new
701
+ and new ["type" ] == "record"
702
+ ):
703
+ for new_field in cast (List [Dict [str , Any ]], new ["fields" ]):
704
+ new_field_missing = True
705
+ for existing_field in cast (List [Dict [str , Any ]], existing ["fields" ]):
706
+ if new_field ["name" ] == existing_field ["name" ]:
707
+ if not is_subtype (existing_field ["type" ], new_field ["type" ]):
708
+ return False
709
+ new_field_missing = False
710
+ if new_field_missing :
711
+ return False
712
+ return True
713
+ if isinstance (existing , list ) and isinstance (new , list ):
714
+ missing = False
715
+ for _type in new :
716
+ if _type not in existing and (
717
+ not is_subtype (existing , cast (PropType , _type ))
718
+ ):
719
+ missing = True
720
+ return not missing
721
+ return False
0 commit comments