File tree Expand file tree Collapse file tree 1 file changed +6
-0
lines changed Expand file tree Collapse file tree 1 file changed +6
-0
lines changed Original file line number Diff line number Diff line change @@ -935,6 +935,12 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
935
935
throw new Error ( `Partition session with id ${ msg . partitionSessionId } not found.` ) ;
936
936
}
937
937
938
+ // Ensure the message's partition ID matches the partition session's partition ID
939
+ // This is crucial for consistency, as messages must be committed to the correct partition
940
+ if ( msg . partitionId !== partitionSession . partitionId ) {
941
+ throw new Error ( `Message partitionId ${ msg . partitionId } does not match partition session partitionId ${ partitionSession . partitionId } .` ) ;
942
+ }
943
+
938
944
// Initialize empty array for this partition if it doesn't exist yet
939
945
if ( ! offsets . has ( partitionSession . partitionSessionId ) ) {
940
946
offsets . set ( partitionSession . partitionSessionId , [ ] ) ;
You can’t perform that action at this time.
0 commit comments