diff --git a/constants.go b/constants.go index 17e8215cb9e..6f597b0cb0e 100644 --- a/constants.go +++ b/constants.go @@ -10,5 +10,10 @@ const ( // Equal to UDP MTU receiveMTU = 1460 + // simulcastProbeCount is the amount of RTP Packets + // that handleUndeclaredSSRC will read and try to dispatch from + // mid and rid values + simulcastProbeCount = 10 + mediaSectionApplication = "application" ) diff --git a/examples/simulcast/main.go b/examples/simulcast/main.go index 1217ba8e6ef..e15f809ab82 100644 --- a/examples/simulcast/main.go +++ b/examples/simulcast/main.go @@ -3,7 +3,6 @@ package main import ( "fmt" "io" - "log" "math/rand" "net/url" "time" @@ -103,17 +102,13 @@ func main() { panic(err) } - // fmt.Printf("offer: %s\n", offer.SDP) - // Set the remote SessionDescription - err = peerConnection.SetRemoteDescription(offer) - if err != nil { + if err = peerConnection.SetRemoteDescription(offer); err != nil { panic(err) } // Set a handler for when a new remote track starts peerConnection.OnTrack(func(track *webrtc.Track, receiver *webrtc.RTPReceiver) { - fmt.Printf("Track has started\n") - log.Println("Track has started", track) + fmt.Println("Track has started") // Start reading from all the streams and sending them to the related output track rid := track.RID() @@ -132,10 +127,9 @@ func main() { } }() for { - var readErr error // Read RTP packets being sent to Pion packet, readErr := track.ReadRTP() - if err != nil { + if readErr != nil { panic(readErr) } @@ -157,8 +151,6 @@ func main() { panic(err) } - fmt.Printf("answer: %s\n", answer.SDP) - // Sets the LocalDescription, and starts our UDP listeners err = peerConnection.SetLocalDescription(answer) if err != nil { diff --git a/peerconnection.go b/peerconnection.go index 1aa408dd95e..1728772dbcf 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -8,6 +8,7 @@ import ( "crypto/elliptic" "crypto/rand" "fmt" + "io" "strconv" "strings" "sync" @@ -877,24 +878,35 @@ func (pc *PeerConnection) SetRemoteDescription(desc SessionDescription) error { } func (pc *PeerConnection) startReceiver(incoming trackDetails, receiver *RTPReceiver) { - err := receiver.Receive(RTPReceiveParameters{ - Encodings: RTPDecodingParameters{ - RTPCodingParameters{SSRC: incoming.ssrc}, - }}) - if err != nil { + encodings := []RTPDecodingParameters{} + if incoming.ssrc != 0 { + encodings = append(encodings, RTPDecodingParameters{RTPCodingParameters{SSRC: incoming.ssrc}}) + } + for _, rid := range incoming.rids { + encodings = append(encodings, RTPDecodingParameters{RTPCodingParameters{RID: rid}}) + } + + if err := receiver.Receive(RTPReceiveParameters{Encodings: encodings}); err != nil { pc.log.Warnf("RTPReceiver Receive failed %s", err) return } // set track id and label early so they can be set as new track information // is received from the SDP. - receiver.Track().mu.Lock() - receiver.Track().id = incoming.id - receiver.Track().label = incoming.label - receiver.Track().mu.Unlock() + for i := range receiver.tracks { + receiver.tracks[i].track.mu.Lock() + receiver.tracks[i].track.id = incoming.id + receiver.tracks[i].track.label = incoming.label + receiver.tracks[i].track.mu.Unlock() + } + + // We can't block and wait for a single SSRC + if incoming.ssrc == 0 { + return + } go func() { - if err = receiver.Track().determinePayloadType(); err != nil { + if err := receiver.Track().determinePayloadType(); err != nil { pc.log.Warnf("Could not determine PayloadType for SSRC %d", receiver.Track().SSRC()) return } @@ -922,7 +934,7 @@ func (pc *PeerConnection) startReceiver(incoming trackDetails, receiver *RTPRece } // startRTPReceivers opens knows inbound SRTP streams from the RemoteDescription -func (pc *PeerConnection) startRTPReceivers(incomingTracks map[uint32]trackDetails, currentTransceivers []*RTPTransceiver) { +func (pc *PeerConnection) startRTPReceivers(incomingTracks []trackDetails, currentTransceivers []*RTPTransceiver) { localTransceivers := append([]*RTPTransceiver{}, currentTransceivers...) remoteIsPlanB := false @@ -934,45 +946,54 @@ func (pc *PeerConnection) startRTPReceivers(incomingTracks map[uint32]trackDetai } // Ensure we haven't already started a transceiver for this ssrc - for ssrc := range incomingTracks { - for i := range localTransceivers { - if t := localTransceivers[i]; (t.Receiver()) == nil || t.Receiver().Track() == nil || t.Receiver().Track().ssrc != ssrc { + for i := range incomingTracks { + if len(incomingTracks) <= i { + break + } + incomingTrack := incomingTracks[i] + + for _, t := range localTransceivers { + if (t.Receiver()) == nil || t.Receiver().Track() == nil || t.Receiver().Track().ssrc != incomingTrack.ssrc { continue } - delete(incomingTracks, ssrc) + incomingTracks = filterTrackWithSSRC(incomingTracks, incomingTrack.ssrc) } } - for ssrc, incoming := range incomingTracks { - for i := range localTransceivers { - t := localTransceivers[i] + for i := range incomingTracks { + for j := range localTransceivers { + if len(incomingTracks) <= i || len(localTransceivers) <= j { + break + } + t := localTransceivers[j] + incomingTrack := incomingTracks[i] - if t.Mid() != incoming.mid { + if t.Mid() != incomingTrack.mid { continue } - if (incomingTracks[ssrc].kind != t.kind) || + if (incomingTrack.kind != t.kind) || (t.Direction() != RTPTransceiverDirectionRecvonly && t.Direction() != RTPTransceiverDirectionSendrecv) || (t.Receiver()) == nil || (t.Receiver().haveReceived()) { continue } - delete(incomingTracks, ssrc) - localTransceivers = append(localTransceivers[:i], localTransceivers[i+1:]...) - pc.startReceiver(incoming, t.Receiver()) + incomingTracks = append(incomingTracks[:i], incomingTracks[i+1:]...) + localTransceivers = append(localTransceivers[:j], localTransceivers[j+1:]...) + pc.startReceiver(incomingTrack, t.Receiver()) break } } if remoteIsPlanB { - for ssrc, incoming := range incomingTracks { + for _, incoming := range incomingTracks { t, err := pc.AddTransceiverFromKind(incoming.kind, RtpTransceiverInit{ Direction: RTPTransceiverDirectionSendrecv, }) if err != nil { - pc.log.Warnf("Could not add transceiver for remote SSRC %d: %s", ssrc, err) + pc.log.Warnf("Could not add transceiver for remote SSRC %d: %s", incoming.ssrc, err) continue } pc.startReceiver(incoming, t.Receiver()) @@ -1036,58 +1057,115 @@ func (pc *PeerConnection) startSCTP() { pc.sctpTransport.lock.Unlock() } -// drainSRTP pulls and discards RTP/RTCP packets that don't match any a:ssrc lines -// If the remote SDP was only one media section the ssrc doesn't have to be explicitly declared -func (pc *PeerConnection) drainSRTP() { - handleUndeclaredSSRC := func(ssrc uint32) bool { - if remoteDescription := pc.RemoteDescription(); remoteDescription != nil { - if len(remoteDescription.parsed.MediaDescriptions) == 1 { - onlyMediaSection := remoteDescription.parsed.MediaDescriptions[0] - for _, a := range onlyMediaSection.Attributes { - if a.Key == ssrcStr { - return false - } - } - - incoming := trackDetails{ - ssrc: ssrc, - kind: RTPCodecTypeVideo, - } - if onlyMediaSection.MediaName.Media == RTPCodecTypeAudio.String() { - incoming.kind = RTPCodecTypeAudio - } +func (pc *PeerConnection) handleUndeclaredSSRC(rtpStream io.Reader, ssrc uint32) error { + remoteDescription := pc.RemoteDescription() + if remoteDescription == nil { + return fmt.Errorf("remote Description has not been set yet") + } - t, err := pc.AddTransceiverFromKind(incoming.kind, RtpTransceiverInit{ - Direction: RTPTransceiverDirectionSendrecv, - }) - if err != nil { - pc.log.Warnf("Could not add transceiver for remote SSRC %d: %s", ssrc, err) - return false - } - pc.startReceiver(incoming, t.Receiver()) - return true + // If the remote SDP was only one media section the ssrc doesn't have to be explicitly declared + if len(remoteDescription.parsed.MediaDescriptions) == 1 { + onlyMediaSection := remoteDescription.parsed.MediaDescriptions[0] + for _, a := range onlyMediaSection.Attributes { + if a.Key == ssrcStr { + return fmt.Errorf("single media section has an explicit SSRC") } } - return false + incoming := trackDetails{ + ssrc: ssrc, + kind: RTPCodecTypeVideo, + } + if onlyMediaSection.MediaName.Media == RTPCodecTypeAudio.String() { + incoming.kind = RTPCodecTypeAudio + } + + t, err := pc.AddTransceiverFromKind(incoming.kind, RtpTransceiverInit{ + Direction: RTPTransceiverDirectionSendrecv, + }) + if err != nil { + return fmt.Errorf("could not add transceiver for remote SSRC %d: %s", ssrc, err) + } + pc.startReceiver(incoming, t.Receiver()) + return nil + } + + // Simulcast no longer uses SSRCes, but RID instead. We then use that value to populate rest of Track Data + matchedSDPMap, err := matchedAnswerExt(pc.RemoteDescription().parsed, pc.api.settingEngine.getSDPExtensions()) + if err != nil { + return err + } + + sdesMidExtMap := getExtMapByURI(matchedSDPMap, sdp.SDESMidURI) + sdesStreamIDExtMap := getExtMapByURI(matchedSDPMap, sdp.SDESRTPStreamIDURI) + if sdesMidExtMap == nil || sdesStreamIDExtMap == nil { + return fmt.Errorf("mid and rid RTP Extensions required for Simulcast") + } + + b := make([]byte, receiveMTU) + var mid, rid string + for readCount := 0; readCount <= simulcastProbeCount; readCount++ { + i, err := rtpStream.Read(b) + if err != nil { + return err + } + + maybeMid, maybeRid, payloadType, err := handleUnknownRTPPacket(b[:i], sdesMidExtMap, sdesStreamIDExtMap) + if err != nil { + return err + } + + if maybeMid != "" { + mid = maybeMid + } + if maybeRid != "" { + rid = maybeRid + } + + if mid == "" || rid == "" { + continue + } + + codec, err := pc.api.mediaEngine.getCodec(payloadType) + if err != nil { + return err + } + + for _, t := range pc.GetTransceivers() { + if t.Mid() != mid || t.Receiver() == nil { + continue + } + + track, err := t.Receiver().receiveForRid(rid, codec, ssrc) + if err != nil { + return err + } + pc.onTrack(track, t.Receiver()) + return nil + } } + return fmt.Errorf("incoming SSRC failed Simulcast probing") +} + +// undeclaredMediaProcessor handles RTP/RTCP packets that don't match any a:ssrc lines +func (pc *PeerConnection) undeclaredMediaProcessor() { go func() { for { srtpSession, err := pc.dtlsTransport.getSRTPSession() if err != nil { - pc.log.Warnf("drainSRTP failed to open SrtpSession: %v", err) + pc.log.Warnf("undeclaredMediaProcessor failed to open SrtpSession: %v", err) return } - _, ssrc, err := srtpSession.AcceptStream() + stream, ssrc, err := srtpSession.AcceptStream() if err != nil { pc.log.Warnf("Failed to accept RTP %v", err) return } - if !handleUndeclaredSSRC(ssrc) { - pc.log.Warnf("Incoming unhandled RTP ssrc(%d), OnTrack will not be fired", ssrc) + if err := pc.handleUndeclaredSSRC(stream, ssrc); err != nil { + pc.log.Errorf("Incoming unhandled RTP ssrc(%d), OnTrack will not be fired. %v", ssrc, err) } } }() @@ -1096,7 +1174,7 @@ func (pc *PeerConnection) drainSRTP() { for { srtcpSession, err := pc.dtlsTransport.getSRTCPSession() if err != nil { - pc.log.Warnf("drainSRTP failed to open SrtcpSession: %v", err) + pc.log.Warnf("undeclaredMediaProcessor failed to open SrtcpSession: %v", err) return } @@ -1694,13 +1772,14 @@ func (pc *PeerConnection) startRTP(isRenegotiation bool, remoteDesc *SessionDesc t.Receiver().Track().mu.Lock() ssrc := t.Receiver().Track().ssrc - if _, ok := trackDetails[ssrc]; ok { - incoming := trackDetails[ssrc] - t.Receiver().Track().id = incoming.id - t.Receiver().Track().label = incoming.label + + if details := trackDetailsForSSRC(trackDetails, ssrc); details != nil { + t.Receiver().Track().id = details.id + t.Receiver().Track().label = details.label t.Receiver().Track().mu.Unlock() continue } + t.Receiver().Track().mu.Unlock() if err := t.Receiver().Stop(); err != nil { @@ -1721,7 +1800,7 @@ func (pc *PeerConnection) startRTP(isRenegotiation bool, remoteDesc *SessionDesc pc.startRTPSenders(currentTransceivers) if !isRenegotiation { - pc.drainSRTP() + pc.undeclaredMediaProcessor() if haveApplicationMediaSection(remoteDesc.parsed) { pc.startSCTP() } diff --git a/rtpcodingparameters.go b/rtpcodingparameters.go index a74c9f4e3ff..1c2965710a7 100644 --- a/rtpcodingparameters.go +++ b/rtpcodingparameters.go @@ -4,6 +4,7 @@ package webrtc // This is a subset of the RFC since Pion WebRTC doesn't implement encoding/decoding itself // http://draft.ortc.org/#dom-rtcrtpcodingparameters type RTPCodingParameters struct { + RID string `json:"rid"` SSRC uint32 `json:"ssrc"` PayloadType uint8 `json:"payloadType"` } diff --git a/rtpreceiveparameters.go b/rtpreceiveparameters.go index be9a5901e1e..badf6b733d3 100644 --- a/rtpreceiveparameters.go +++ b/rtpreceiveparameters.go @@ -2,5 +2,5 @@ package webrtc // RTPReceiveParameters contains the RTP stack settings used by receivers type RTPReceiveParameters struct { - Encodings RTPDecodingParameters + Encodings []RTPDecodingParameters } diff --git a/rtpreceiver.go b/rtpreceiver.go index 276af151f3c..d250dde78d8 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -11,19 +11,24 @@ import ( "github.com/pion/srtp" ) +// trackStreams maintains a mapping of RTP/RTCP streams to a specific track +// a RTPReceiver may contain multiple streams if we are dealing with Multicast +type trackStreams struct { + track *Track + rtpReadStream *srtp.ReadStreamSRTP + rtcpReadStream *srtp.ReadStreamSRTCP +} + // RTPReceiver allows an application to inspect the receipt of a Track type RTPReceiver struct { kind RTPCodecType transport *DTLSTransport - track *Track + tracks []trackStreams closed, received chan interface{} mu sync.RWMutex - rtpReadStream *srtp.ReadStreamSRTP - rtcpReadStream *srtp.ReadStreamSRTCP - // A reference to the associated api object api *API } @@ -40,6 +45,7 @@ func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RT api: api, closed: make(chan interface{}), received: make(chan interface{}), + tracks: []trackStreams{}, }, nil } @@ -51,11 +57,28 @@ func (r *RTPReceiver) Transport() *DTLSTransport { return r.transport } -// Track returns the RTCRtpTransceiver track +// Track returns the RtpTransceiver track func (r *RTPReceiver) Track() *Track { r.mu.RLock() defer r.mu.RUnlock() - return r.track + + if len(r.tracks) != 1 { + return nil + } + return r.tracks[0].track +} + +// Tracks returns the RtpTransceiver tracks +// A RTPReceiver to support Simulcast may now have multiple tracks +func (r *RTPReceiver) Tracks() []*Track { + r.mu.RLock() + defer r.mu.RUnlock() + + tracks := []*Track{} + for i := range r.tracks { + tracks = append(tracks, r.tracks[i].track) + } + return tracks } // Receive initialize the track and starts all the transports @@ -69,30 +92,32 @@ func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error { } defer close(r.received) - r.track = &Track{ - kind: r.kind, - ssrc: parameters.Encodings.SSRC, - receiver: r, - } - - srtpSession, err := r.transport.getSRTPSession() - if err != nil { - return err - } - - r.rtpReadStream, err = srtpSession.OpenReadStream(parameters.Encodings.SSRC) - if err != nil { - return err - } + if len(parameters.Encodings) == 1 && parameters.Encodings[0].SSRC != 0 { + t := trackStreams{ + track: &Track{ + kind: r.kind, + ssrc: parameters.Encodings[0].SSRC, + receiver: r, + }, + } - srtcpSession, err := r.transport.getSRTCPSession() - if err != nil { - return err - } + var err error + t.rtpReadStream, t.rtcpReadStream, err = r.streamsForSSRC(parameters.Encodings[0].SSRC) + if err != nil { + return err + } - r.rtcpReadStream, err = srtcpSession.OpenReadStream(parameters.Encodings.SSRC) - if err != nil { - return err + r.tracks = append(r.tracks, t) + } else { + for _, encoding := range parameters.Encodings { + r.tracks = append(r.tracks, trackStreams{ + track: &Track{ + kind: r.kind, + rid: encoding.RID, + receiver: r, + }, + }) + } } return nil @@ -102,7 +127,7 @@ func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error { func (r *RTPReceiver) Read(b []byte) (n int, err error) { select { case <-r.received: - return r.rtcpReadStream.Read(b) + return r.tracks[0].rtcpReadStream.Read(b) case <-r.closed: return 0, io.ErrClosedPipe } @@ -141,13 +166,11 @@ func (r *RTPReceiver) Stop() error { select { case <-r.received: - if r.rtcpReadStream != nil { - if err := r.rtcpReadStream.Close(); err != nil { + for i := range r.tracks { + if err := r.tracks[i].rtcpReadStream.Close(); err != nil { return err } - } - if r.rtpReadStream != nil { - if err := r.rtpReadStream.Close(); err != nil { + if err := r.tracks[i].rtpReadStream.Close(); err != nil { return err } } @@ -158,8 +181,72 @@ func (r *RTPReceiver) Stop() error { return nil } +func (r *RTPReceiver) streamsForTrack(t *Track) *trackStreams { + for i := range r.tracks { + if r.tracks[i].track == t { + return &r.tracks[i] + } + } + return nil +} + // readRTP should only be called by a track, this only exists so we can keep state in one place -func (r *RTPReceiver) readRTP(b []byte) (n int, err error) { +func (r *RTPReceiver) readRTP(b []byte, reader *Track) (n int, err error) { <-r.received - return r.rtpReadStream.Read(b) + if t := r.streamsForTrack(reader); t != nil { + return t.rtpReadStream.Read(b) + } + + return 0, fmt.Errorf("unable to find stream for Track with SSRC(%d)", reader.SSRC()) +} + +// receiveForRid is the sibling of Receive expect for RIDs instead of SSRCs +// It populates all the internal state for the given RID +func (r *RTPReceiver) receiveForRid(rid string, codec *RTPCodec, ssrc uint32) (*Track, error) { + r.mu.Lock() + defer r.mu.Unlock() + + for i := range r.tracks { + if r.tracks[i].track.RID() == rid { + r.tracks[i].track.mu.Lock() + r.tracks[i].track.kind = codec.Type + r.tracks[i].track.codec = codec + r.tracks[i].track.ssrc = ssrc + r.tracks[i].track.mu.Unlock() + + var err error + r.tracks[i].rtpReadStream, r.tracks[i].rtcpReadStream, err = r.streamsForSSRC(ssrc) + if err != nil { + return nil, err + } + + return r.tracks[i].track, nil + } + } + + return nil, fmt.Errorf("no trackStreams found for SSRC(%d)", ssrc) +} + +func (r *RTPReceiver) streamsForSSRC(ssrc uint32) (*srtp.ReadStreamSRTP, *srtp.ReadStreamSRTCP, error) { + srtpSession, err := r.transport.getSRTPSession() + if err != nil { + return nil, nil, err + } + + rtpReadStream, err := srtpSession.OpenReadStream(ssrc) + if err != nil { + return nil, nil, err + } + + srtcpSession, err := r.transport.getSRTCPSession() + if err != nil { + return nil, nil, err + } + + rtcpReadStream, err := srtcpSession.OpenReadStream(ssrc) + if err != nil { + return nil, nil, err + } + + return rtpReadStream, rtcpReadStream, nil } diff --git a/rtptransceiver.go b/rtptransceiver.go index af094bf6992..0a0b84f4cb1 100644 --- a/rtptransceiver.go +++ b/rtptransceiver.go @@ -5,6 +5,9 @@ package webrtc import ( "fmt" "sync/atomic" + + "github.com/pion/rtp" + "github.com/pion/sdp/v2" ) // RTPTransceiver represents a combination of an RTPSender and an RTPReceiver that share a common mid. @@ -150,3 +153,27 @@ func satisfyTypeAndDirection(remoteKind RTPCodecType, remoteDirection RTPTransce return nil, localTransceivers } + +// handleUnknownRTPPacket consumes a single RTP Packet and returns information that is helpful +// for demuxing and handling an unknown SSRC (usually for Simulcast) +func handleUnknownRTPPacket(buf []byte, sdesMidExtMap, sdesStreamIDExtMap *sdp.ExtMap) (mid, rid string, payloadType uint8, err error) { + rp := &rtp.Packet{} + if err = rp.Unmarshal(buf); err != nil { + return + } + + if !rp.Header.Extension { + return + } + + payloadType = rp.PayloadType + if payload := rp.GetExtension(uint8(sdesMidExtMap.Value)); payload != nil { + mid = string(payload) + } + + if payload := rp.GetExtension(uint8(sdesStreamIDExtMap.Value)); payload != nil { + rid = string(payload) + } + + return +} diff --git a/sdp.go b/sdp.go index abff06d02b2..39b4f4f6e37 100644 --- a/sdp.go +++ b/sdp.go @@ -12,12 +12,34 @@ import ( "github.com/pion/sdp/v2" ) +// trackDetails represents any media source that can be represented in a SDP +// This isn't keyed by SSRC because it also needs to support rid based sources type trackDetails struct { mid string kind RTPCodecType label string id string ssrc uint32 + rids []string +} + +func trackDetailsForSSRC(trackDetails []trackDetails, ssrc uint32) *trackDetails { + for i := range trackDetails { + if trackDetails[i].ssrc == ssrc { + return &trackDetails[i] + } + } + return nil +} + +func filterTrackWithSSRC(incomingTracks []trackDetails, ssrc uint32) []trackDetails { + filtered := []trackDetails{} + for i := range incomingTracks { + if incomingTracks[i].ssrc != ssrc { + filtered = append(filtered, incomingTracks[i]) + } + } + return filtered } // SDPSectionType specifies media type sections @@ -31,8 +53,8 @@ const ( ) // extract all trackDetails from an SDP. -func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) map[uint32]trackDetails { - incomingTracks := map[uint32]trackDetails{} +func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) []trackDetails { + incomingTracks := []trackDetails{} rtxRepairFlows := map[uint32]bool{} for _, media := range s.MediaDescriptions { @@ -78,7 +100,7 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) m continue } rtxRepairFlows[uint32(rtxRepairFlow)] = true - delete(incomingTracks, uint32(rtxRepairFlow)) // Remove if rtx was added as track before + incomingTracks = filterTrackWithSSRC(incomingTracks, uint32(rtxRepairFlow)) // Remove if rtx was added as track before } } @@ -99,31 +121,52 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) m log.Warnf("Failed to parse SSRC: %v", err) continue } + if rtxRepairFlow := rtxRepairFlows[uint32(ssrc)]; rtxRepairFlow { continue // This ssrc is a RTX repair flow, ignore } - if existingValues, ok := incomingTracks[uint32(ssrc)]; ok && existingValues.label != "" && existingValues.id != "" { - continue // This ssrc is already fully defined - } if len(split) == 3 && strings.HasPrefix(split[1], "msid:") { trackLabel = split[1][len("msid:"):] trackID = split[2] } - // Plan B might send multiple a=ssrc lines under a single m= section. This is also why a single trackDetails{} - // is not defined at the top of the loop over s.MediaDescriptions. - incomingTracks[uint32(ssrc)] = trackDetails{ - mid: midValue, - kind: codecType, - label: trackLabel, - id: trackID, - ssrc: uint32(ssrc), + isNewTrack := true + trackDetails := &trackDetails{} + for i := range incomingTracks { + if incomingTracks[i].ssrc == uint32(ssrc) { + trackDetails = &incomingTracks[i] + isNewTrack = false + } + } + + trackDetails.mid = midValue + trackDetails.kind = codecType + trackDetails.label = trackLabel + trackDetails.id = trackID + trackDetails.ssrc = uint32(ssrc) + + if isNewTrack { + incomingTracks = append(incomingTracks, *trackDetails) } } } - } + if rids := getRids(media); len(rids) != 0 && trackID != "" && trackLabel != "" { + newTrack := trackDetails{ + mid: midValue, + kind: codecType, + label: trackLabel, + id: trackID, + rids: []string{}, + } + for rid := range rids { + newTrack.rids = append(newTrack.rids, rid) + } + + incomingTracks = append(incomingTracks, newTrack) + } + } return incomingTracks } @@ -269,8 +312,15 @@ func addTransceiverSDP(d *sdp.SessionDescription, isPlanB bool, dtlsFingerprints } } - for rid := range mediaSection.ridMap { - media.WithValueAttribute("rid", rid+" recv") + if len(mediaSection.ridMap) > 0 { + recvRids := make([]string, 0, len(mediaSection.ridMap)) + + for rid := range mediaSection.ridMap { + media.WithValueAttribute("rid", rid+" recv") + recvRids = append(recvRids, rid) + } + // Simulcast + media.WithValueAttribute("simulcast", "recv "+strings.Join(recvRids, ";")) } for _, mt := range transceivers { @@ -561,3 +611,21 @@ func remoteExts(session *sdp.SessionDescription) (map[SDPSectionType]map[int]sdp } return remoteExtMaps, nil } + +// GetExtMapByURI return a copy of the extmap matching the provided +// URI. Note that the extmap value will change if not yet negotiated +func getExtMapByURI(exts map[SDPSectionType][]sdp.ExtMap, uri string) *sdp.ExtMap { + for _, extList := range exts { + for _, extMap := range extList { + if extMap.URI.String() == uri { + return &sdp.ExtMap{ + Value: extMap.Value, + Direction: extMap.Direction, + URI: extMap.URI, + ExtAttr: extMap.ExtAttr, + } + } + } + } + return nil +} diff --git a/sdp_test.go b/sdp_test.go index 1c62d7fcaf5..89c1b71ca5f 100644 --- a/sdp_test.go +++ b/sdp_test.go @@ -213,27 +213,27 @@ func TestTrackDetailsFromSDP(t *testing.T) { tracks := trackDetailsFromSDP(nil, s) assert.Equal(t, 3, len(tracks)) - if _, ok := tracks[1000]; ok { + if trackDetail := trackDetailsForSSRC(tracks, 1000); trackDetail != nil { assert.Fail(t, "got the unknown track ssrc:1000 which should have been skipped") } - if track, ok := tracks[2000]; !ok { + if track := trackDetailsForSSRC(tracks, 2000); track == nil { assert.Fail(t, "missing audio track with ssrc:2000") } else { assert.Equal(t, RTPCodecTypeAudio, track.kind) assert.Equal(t, uint32(2000), track.ssrc) assert.Equal(t, "audio_trk_label", track.label) } - if track, ok := tracks[3000]; !ok { + if track := trackDetailsForSSRC(tracks, 3000); track == nil { assert.Fail(t, "missing video track with ssrc:3000") } else { assert.Equal(t, RTPCodecTypeVideo, track.kind) assert.Equal(t, uint32(3000), track.ssrc) assert.Equal(t, "video_trk_label", track.label) } - if _, ok := tracks[4000]; ok { + if track := trackDetailsForSSRC(tracks, 4000); track != nil { assert.Fail(t, "got the rtx track ssrc:3000 which should have been skipped") } - if track, ok := tracks[5000]; !ok { + if track := trackDetailsForSSRC(tracks, 5000); track == nil { assert.Fail(t, "missing video track with ssrc:5000") } else { assert.Equal(t, RTPCodecTypeVideo, track.kind) diff --git a/track.go b/track.go index 2a48777a568..7c666b0f822 100644 --- a/track.go +++ b/track.go @@ -27,6 +27,7 @@ type Track struct { label string ssrc uint32 codec *RTPCodec + rid string packetizer rtp.Packetizer @@ -42,6 +43,16 @@ func (t *Track) ID() string { return t.id } +// RID gets the RTP Stream ID of this Track +// With Simulcast you will have multiple tracks with the same ID, but different RID values. +// In many cases a Track will not have an RID, so it is important to assert it is non-zero +func (t *Track) RID() string { + t.mu.RLock() + defer t.mu.RUnlock() + + return t.rid +} + // PayloadType gets the PayloadType of the track func (t *Track) PayloadType() uint8 { t.mu.RLock() @@ -95,7 +106,7 @@ func (t *Track) Read(b []byte) (n int, err error) { } t.mu.RUnlock() - return r.readRTP(b) + return r.readRTP(b, t) } // ReadRTP is a convenience method that wraps Read and unmarshals for you