From c8580473cc48f0615b8188c523c18775957abfa6 Mon Sep 17 00:00:00 2001 From: Jong-Liang Nieh Date: Fri, 6 Oct 2023 16:17:26 +0800 Subject: [PATCH 1/2] Fix the the issue of non-DVB transport stream(TS) using the PID 0x10~0x1F as the PID of the video/audio streams of programs --- data.go | 4 +++- demuxer.go | 5 +++++ program_map.go | 13 +++++++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/data.go b/data.go index 47cd619..cc318ff 100644 --- a/data.go +++ b/data.go @@ -3,6 +3,7 @@ package astits import ( "encoding/binary" "fmt" + "github.com/asticode/go-astikit" ) @@ -113,7 +114,8 @@ func parseData(ps []*Packet, prs PacketsParser, pm *programMap) (ds []*DemuxerDa func isPSIPayload(pid uint16, pm *programMap) bool { return pid == PIDPAT || // PAT pm.existsUnlocked(pid) || // PMT - ((pid >= 0x10 && pid <= 0x14) || (pid >= 0x1e && pid <= 0x1f)) //DVB + (((pid >= 0x10 && pid <= 0x14) || (pid >= 0x1e && pid <= 0x1f)) && //DVB + !pm.existsLocked(pid)) // for non-DVB } // isPESPayload checks whether the payload is a PES one diff --git a/demuxer.go b/demuxer.go index 044856a..fc62b74 100644 --- a/demuxer.go +++ b/demuxer.go @@ -199,6 +199,11 @@ func (dmx *Demuxer) updateData(ds []*DemuxerData) (d *DemuxerData) { } } } + if v.PMT != nil { + for _, es := range v.PMT.ElementaryStreams { + dmx.programMap.setLocked(es.ElementaryPID, v.PMT.ProgramNumber) + } + } } } return diff --git a/program_map.go b/program_map.go index 60afe23..c67a2d4 100644 --- a/program_map.go +++ b/program_map.go @@ -4,15 +4,28 @@ package astits type programMap struct { // We use map[uint32] instead map[uint16] as go runtime provide optimized hash functions for (u)int32/64 keys p map[uint32]uint16 // map[ProgramMapID]ProgramNumber + s map[uint32]uint16 // map[StreamID]ProgramNumber } // newProgramMap creates a new program ids map func newProgramMap() *programMap { return &programMap{ p: make(map[uint32]uint16), + s: make(map[uint32]uint16), } } +// setLocked sets a new stream id to the program +func (m programMap) setLocked(pid, number uint16) { + m.s[uint32(pid)] = number +} + +// existsLocked checks whether the stream with this pid exists +func (m programMap) existsLocked(pid uint16) (ok bool) { + _, ok = m.s[uint32(pid)] + return +} + // existsUnlocked checks whether the program with this pid exists func (m programMap) existsUnlocked(pid uint16) (ok bool) { _, ok = m.p[uint32(pid)] From 1cdada57584d1c46167e1d72600e9579adfc7703 Mon Sep 17 00:00:00 2001 From: Jong-Liang Nieh Date: Tue, 24 Oct 2023 16:12:41 +0800 Subject: [PATCH 2/2] Update to use a new elementaryStreamMap to control those PIDs which used by known elementary streams of programs in PMT tables --- data.go | 8 ++++---- data_test.go | 14 ++++++++------ demuxer.go | 8 +++++--- program_map.go | 13 ------------- stream_map.go | 29 +++++++++++++++++++++++++++++ stream_map_test.go | 16 ++++++++++++++++ 6 files changed, 62 insertions(+), 26 deletions(-) create mode 100644 stream_map.go create mode 100644 stream_map_test.go diff --git a/data.go b/data.go index cc318ff..e09092d 100644 --- a/data.go +++ b/data.go @@ -36,7 +36,7 @@ type MuxerData struct { } // parseData parses a payload spanning over multiple packets and returns a set of data -func parseData(ps []*Packet, prs PacketsParser, pm *programMap) (ds []*DemuxerData, err error) { +func parseData(ps []*Packet, prs PacketsParser, pm *programMap, esm *elementaryStreamMap) (ds []*DemuxerData, err error) { // Use custom parser first if prs != nil { var skip bool @@ -80,7 +80,7 @@ func parseData(ps []*Packet, prs PacketsParser, pm *programMap) (ds []*DemuxerDa if pid == PIDCAT { // Information in a CAT payload is private and dependent on the CA system. Use the PacketsParser // to parse this type of payload - } else if isPSIPayload(pid, pm) { + } else if isPSIPayload(pid, pm, esm) { // Parse PSI data var psiData *PSIData if psiData, err = parsePSIData(i); err != nil { @@ -111,11 +111,11 @@ func parseData(ps []*Packet, prs PacketsParser, pm *programMap) (ds []*DemuxerDa } // isPSIPayload checks whether the payload is a PSI one -func isPSIPayload(pid uint16, pm *programMap) bool { +func isPSIPayload(pid uint16, pm *programMap, esm *elementaryStreamMap) bool { return pid == PIDPAT || // PAT pm.existsUnlocked(pid) || // PMT (((pid >= 0x10 && pid <= 0x14) || (pid >= 0x1e && pid <= 0x1f)) && //DVB - !pm.existsLocked(pid)) // for non-DVB + !esm.existsLocked(pid)) // for non-DVB } // isPESPayload checks whether the payload is a PES one diff --git a/data_test.go b/data_test.go index 2a8da98..2f9550f 100644 --- a/data_test.go +++ b/data_test.go @@ -11,6 +11,7 @@ import ( func TestParseData(t *testing.T) { // Init pm := newProgramMap() + esm := newElementaryStreamMap() ps := []*Packet{} // Custom parser @@ -20,13 +21,13 @@ func TestParseData(t *testing.T) { skip = true return } - ds, err := parseData(ps, c, pm) + ds, err := parseData(ps, c, pm, esm) assert.NoError(t, err) assert.Equal(t, cds, ds) // Do nothing for CAT ps = []*Packet{{Header: PacketHeader{PID: PIDCAT}}} - ds, err = parseData(ps, nil, pm) + ds, err = parseData(ps, nil, pm, esm) assert.NoError(t, err) assert.Empty(t, ds) @@ -42,7 +43,7 @@ func TestParseData(t *testing.T) { Payload: p[33:], }, } - ds, err = parseData(ps, nil, pm) + ds, err = parseData(ps, nil, pm, esm) assert.NoError(t, err) assert.Equal(t, []*DemuxerData{ { @@ -64,7 +65,7 @@ func TestParseData(t *testing.T) { Payload: p[33:], }, } - ds, err = parseData(ps, nil, pm) + ds, err = parseData(ps, nil, pm, esm) assert.NoError(t, err) assert.Equal(t, psi.toData( &Packet{Header: ps[0].Header, AdaptationField: ps[0].AdaptationField}, @@ -74,15 +75,16 @@ func TestParseData(t *testing.T) { func TestIsPSIPayload(t *testing.T) { pm := newProgramMap() + esm := newElementaryStreamMap() var pids []int for i := 0; i <= 255; i++ { - if isPSIPayload(uint16(i), pm) { + if isPSIPayload(uint16(i), pm, esm) { pids = append(pids, i) } } assert.Equal(t, []int{0, 16, 17, 18, 19, 20, 30, 31}, pids) pm.setUnlocked(uint16(1), uint16(0)) - assert.True(t, isPSIPayload(uint16(1), pm)) + assert.True(t, isPSIPayload(uint16(1), pm, esm)) } func TestIsPESPayload(t *testing.T) { diff --git a/demuxer.go b/demuxer.go index fc62b74..6fc2afa 100644 --- a/demuxer.go +++ b/demuxer.go @@ -34,6 +34,7 @@ type Demuxer struct { packetBuffer *packetBuffer packetPool *packetPool programMap *programMap + streamMap *elementaryStreamMap r io.Reader } @@ -52,6 +53,7 @@ func NewDemuxer(ctx context.Context, r io.Reader, opts ...func(*Demuxer)) (d *De ctx: ctx, l: astikit.AdaptStdLogger(nil), programMap: newProgramMap(), + streamMap: newElementaryStreamMap(), r: r, } d.packetPool = newPacketPool(d.programMap) @@ -145,7 +147,7 @@ func (dmx *Demuxer) NextData() (d *DemuxerData, err error) { // Parse data var errParseData error - if ds, errParseData = parseData(ps, dmx.optPacketsParser, dmx.programMap); errParseData != nil { + if ds, errParseData = parseData(ps, dmx.optPacketsParser, dmx.programMap, dmx.streamMap); errParseData != nil { // Log error as there may be some incomplete data here // We still want to try to parse all packets, in case final data is complete dmx.l.Error(fmt.Errorf("astits: parsing data failed: %w", errParseData)) @@ -170,7 +172,7 @@ func (dmx *Demuxer) NextData() (d *DemuxerData, err error) { } // Parse data - if ds, err = parseData(ps, dmx.optPacketsParser, dmx.programMap); err != nil { + if ds, err = parseData(ps, dmx.optPacketsParser, dmx.programMap, dmx.streamMap); err != nil { err = fmt.Errorf("astits: building new data failed: %w", err) return } @@ -201,7 +203,7 @@ func (dmx *Demuxer) updateData(ds []*DemuxerData) (d *DemuxerData) { } if v.PMT != nil { for _, es := range v.PMT.ElementaryStreams { - dmx.programMap.setLocked(es.ElementaryPID, v.PMT.ProgramNumber) + dmx.streamMap.setLocked(es.ElementaryPID, v.PMT.ProgramNumber) } } } diff --git a/program_map.go b/program_map.go index c67a2d4..60afe23 100644 --- a/program_map.go +++ b/program_map.go @@ -4,28 +4,15 @@ package astits type programMap struct { // We use map[uint32] instead map[uint16] as go runtime provide optimized hash functions for (u)int32/64 keys p map[uint32]uint16 // map[ProgramMapID]ProgramNumber - s map[uint32]uint16 // map[StreamID]ProgramNumber } // newProgramMap creates a new program ids map func newProgramMap() *programMap { return &programMap{ p: make(map[uint32]uint16), - s: make(map[uint32]uint16), } } -// setLocked sets a new stream id to the program -func (m programMap) setLocked(pid, number uint16) { - m.s[uint32(pid)] = number -} - -// existsLocked checks whether the stream with this pid exists -func (m programMap) existsLocked(pid uint16) (ok bool) { - _, ok = m.s[uint32(pid)] - return -} - // existsUnlocked checks whether the program with this pid exists func (m programMap) existsUnlocked(pid uint16) (ok bool) { _, ok = m.p[uint32(pid)] diff --git a/stream_map.go b/stream_map.go new file mode 100644 index 0000000..8f22264 --- /dev/null +++ b/stream_map.go @@ -0,0 +1,29 @@ +package astits + +// elementaryStreamMap represents an elementary stream ids map +type elementaryStreamMap struct { + // We use map[uint32] instead map[uint16] as go runtime provide optimized hash functions for (u)int32/64 keys + es map[uint32]uint16 // map[StreamID]ProgramNumber +} + +// newElementaryStreamMap creates a new elementary stream ids map +func newElementaryStreamMap() *elementaryStreamMap { + return &elementaryStreamMap{ + es: make(map[uint32]uint16), + } +} + +// setLocked sets a new stream id to the elementary stream +func (m elementaryStreamMap) setLocked(pid, number uint16) { + m.es[uint32(pid)] = number +} + +// existsLocked checks whether the stream with this pid exists +func (m elementaryStreamMap) existsLocked(pid uint16) (ok bool) { + _, ok = m.es[uint32(pid)] + return +} + +func (m elementaryStreamMap) unsetLocked(pid uint16) { + delete(m.es, uint32(pid)) +} diff --git a/stream_map_test.go b/stream_map_test.go new file mode 100644 index 0000000..21ba97d --- /dev/null +++ b/stream_map_test.go @@ -0,0 +1,16 @@ +package astits + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestElementaryStreamMap(t *testing.T) { + esm := newElementaryStreamMap() + assert.False(t, esm.existsLocked(0x16)) + esm.setLocked(0x16, 1) + assert.True(t, esm.existsLocked(0x16)) + esm.unsetLocked(0x16) + assert.False(t, esm.existsLocked(0x16)) +}