Skip to content

Commit

Permalink
Add Simulcast support
Browse files Browse the repository at this point in the history
Resolves #1016
  • Loading branch information
jbrady42 authored and Sean-Der committed Jul 24, 2020
1 parent 570ddd0 commit 6ee528d
Show file tree
Hide file tree
Showing 10 changed files with 406 additions and 136 deletions.
5 changes: 5 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,10 @@ const (
// Equal to UDP MTU
receiveMTU = 1460

// simulcastProbeCount is the amount of RTP Packets
// that handleUndeclaredSSRC will read and try to dispatch from
// mid and rid values
simulcastProbeCount = 10

mediaSectionApplication = "application"
)
14 changes: 3 additions & 11 deletions examples/simulcast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"fmt"
"io"
"log"
"math/rand"
"net/url"
"time"
Expand Down Expand Up @@ -103,17 +102,13 @@ func main() {
panic(err)
}

// fmt.Printf("offer: %s\n", offer.SDP)
// Set the remote SessionDescription
err = peerConnection.SetRemoteDescription(offer)
if err != nil {
if err = peerConnection.SetRemoteDescription(offer); err != nil {
panic(err)
}

// Set a handler for when a new remote track starts
peerConnection.OnTrack(func(track *webrtc.Track, receiver *webrtc.RTPReceiver) {
fmt.Printf("Track has started\n")
log.Println("Track has started", track)
fmt.Println("Track has started")

// Start reading from all the streams and sending them to the related output track
rid := track.RID()
Expand All @@ -132,10 +127,9 @@ func main() {
}
}()
for {
var readErr error
// Read RTP packets being sent to Pion
packet, readErr := track.ReadRTP()
if err != nil {
if readErr != nil {
panic(readErr)
}

Expand All @@ -157,8 +151,6 @@ func main() {
panic(err)
}

fmt.Printf("answer: %s\n", answer.SDP)

// Sets the LocalDescription, and starts our UDP listeners
err = peerConnection.SetLocalDescription(answer)
if err != nil {
Expand Down
209 changes: 144 additions & 65 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/elliptic"
"crypto/rand"
"fmt"
"io"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -877,24 +878,35 @@ func (pc *PeerConnection) SetRemoteDescription(desc SessionDescription) error {
}

func (pc *PeerConnection) startReceiver(incoming trackDetails, receiver *RTPReceiver) {
err := receiver.Receive(RTPReceiveParameters{
Encodings: RTPDecodingParameters{
RTPCodingParameters{SSRC: incoming.ssrc},
}})
if err != nil {
encodings := []RTPDecodingParameters{}
if incoming.ssrc != 0 {
encodings = append(encodings, RTPDecodingParameters{RTPCodingParameters{SSRC: incoming.ssrc}})
}
for _, rid := range incoming.rids {
encodings = append(encodings, RTPDecodingParameters{RTPCodingParameters{RID: rid}})
}

if err := receiver.Receive(RTPReceiveParameters{Encodings: encodings}); err != nil {
pc.log.Warnf("RTPReceiver Receive failed %s", err)
return
}

// set track id and label early so they can be set as new track information
// is received from the SDP.
receiver.Track().mu.Lock()
receiver.Track().id = incoming.id
receiver.Track().label = incoming.label
receiver.Track().mu.Unlock()
for i := range receiver.tracks {
receiver.tracks[i].track.mu.Lock()
receiver.tracks[i].track.id = incoming.id
receiver.tracks[i].track.label = incoming.label
receiver.tracks[i].track.mu.Unlock()
}

// We can't block and wait for a single SSRC
if incoming.ssrc == 0 {
return
}

go func() {
if err = receiver.Track().determinePayloadType(); err != nil {
if err := receiver.Track().determinePayloadType(); err != nil {
pc.log.Warnf("Could not determine PayloadType for SSRC %d", receiver.Track().SSRC())
return
}
Expand Down Expand Up @@ -922,7 +934,7 @@ func (pc *PeerConnection) startReceiver(incoming trackDetails, receiver *RTPRece
}

// startRTPReceivers opens knows inbound SRTP streams from the RemoteDescription
func (pc *PeerConnection) startRTPReceivers(incomingTracks map[uint32]trackDetails, currentTransceivers []*RTPTransceiver) {
func (pc *PeerConnection) startRTPReceivers(incomingTracks []trackDetails, currentTransceivers []*RTPTransceiver) {
localTransceivers := append([]*RTPTransceiver{}, currentTransceivers...)

remoteIsPlanB := false
Expand All @@ -934,45 +946,54 @@ func (pc *PeerConnection) startRTPReceivers(incomingTracks map[uint32]trackDetai
}

// Ensure we haven't already started a transceiver for this ssrc
for ssrc := range incomingTracks {
for i := range localTransceivers {
if t := localTransceivers[i]; (t.Receiver()) == nil || t.Receiver().Track() == nil || t.Receiver().Track().ssrc != ssrc {
for i := range incomingTracks {
if len(incomingTracks) <= i {
break
}
incomingTrack := incomingTracks[i]

for _, t := range localTransceivers {
if (t.Receiver()) == nil || t.Receiver().Track() == nil || t.Receiver().Track().ssrc != incomingTrack.ssrc {
continue
}

delete(incomingTracks, ssrc)
incomingTracks = filterTrackWithSSRC(incomingTracks, incomingTrack.ssrc)
}
}

for ssrc, incoming := range incomingTracks {
for i := range localTransceivers {
t := localTransceivers[i]
for i := range incomingTracks {
for j := range localTransceivers {
if len(incomingTracks) <= i || len(localTransceivers) <= j {
break
}
t := localTransceivers[j]
incomingTrack := incomingTracks[i]

if t.Mid() != incoming.mid {
if t.Mid() != incomingTrack.mid {
continue
}

if (incomingTracks[ssrc].kind != t.kind) ||
if (incomingTrack.kind != t.kind) ||
(t.Direction() != RTPTransceiverDirectionRecvonly && t.Direction() != RTPTransceiverDirectionSendrecv) ||
(t.Receiver()) == nil ||
(t.Receiver().haveReceived()) {
continue
}

delete(incomingTracks, ssrc)
localTransceivers = append(localTransceivers[:i], localTransceivers[i+1:]...)
pc.startReceiver(incoming, t.Receiver())
incomingTracks = append(incomingTracks[:i], incomingTracks[i+1:]...)
localTransceivers = append(localTransceivers[:j], localTransceivers[j+1:]...)
pc.startReceiver(incomingTrack, t.Receiver())
break
}
}

if remoteIsPlanB {
for ssrc, incoming := range incomingTracks {
for _, incoming := range incomingTracks {
t, err := pc.AddTransceiverFromKind(incoming.kind, RtpTransceiverInit{
Direction: RTPTransceiverDirectionSendrecv,
})
if err != nil {
pc.log.Warnf("Could not add transceiver for remote SSRC %d: %s", ssrc, err)
pc.log.Warnf("Could not add transceiver for remote SSRC %d: %s", incoming.ssrc, err)
continue
}
pc.startReceiver(incoming, t.Receiver())
Expand Down Expand Up @@ -1036,58 +1057,115 @@ func (pc *PeerConnection) startSCTP() {
pc.sctpTransport.lock.Unlock()
}

// drainSRTP pulls and discards RTP/RTCP packets that don't match any a:ssrc lines
// If the remote SDP was only one media section the ssrc doesn't have to be explicitly declared
func (pc *PeerConnection) drainSRTP() {
handleUndeclaredSSRC := func(ssrc uint32) bool {
if remoteDescription := pc.RemoteDescription(); remoteDescription != nil {
if len(remoteDescription.parsed.MediaDescriptions) == 1 {
onlyMediaSection := remoteDescription.parsed.MediaDescriptions[0]
for _, a := range onlyMediaSection.Attributes {
if a.Key == ssrcStr {
return false
}
}

incoming := trackDetails{
ssrc: ssrc,
kind: RTPCodecTypeVideo,
}
if onlyMediaSection.MediaName.Media == RTPCodecTypeAudio.String() {
incoming.kind = RTPCodecTypeAudio
}
func (pc *PeerConnection) handleUndeclaredSSRC(rtpStream io.Reader, ssrc uint32) error {
remoteDescription := pc.RemoteDescription()
if remoteDescription == nil {
return fmt.Errorf("remote Description has not been set yet")
}

t, err := pc.AddTransceiverFromKind(incoming.kind, RtpTransceiverInit{
Direction: RTPTransceiverDirectionSendrecv,
})
if err != nil {
pc.log.Warnf("Could not add transceiver for remote SSRC %d: %s", ssrc, err)
return false
}
pc.startReceiver(incoming, t.Receiver())
return true
// If the remote SDP was only one media section the ssrc doesn't have to be explicitly declared
if len(remoteDescription.parsed.MediaDescriptions) == 1 {
onlyMediaSection := remoteDescription.parsed.MediaDescriptions[0]
for _, a := range onlyMediaSection.Attributes {
if a.Key == ssrcStr {
return fmt.Errorf("single media section has an explicit SSRC")
}
}

return false
incoming := trackDetails{
ssrc: ssrc,
kind: RTPCodecTypeVideo,
}
if onlyMediaSection.MediaName.Media == RTPCodecTypeAudio.String() {
incoming.kind = RTPCodecTypeAudio
}

t, err := pc.AddTransceiverFromKind(incoming.kind, RtpTransceiverInit{
Direction: RTPTransceiverDirectionSendrecv,
})
if err != nil {
return fmt.Errorf("could not add transceiver for remote SSRC %d: %s", ssrc, err)
}
pc.startReceiver(incoming, t.Receiver())
return nil
}

// Simulcast no longer uses SSRCes, but RID instead. We then use that value to populate rest of Track Data
matchedSDPMap, err := matchedAnswerExt(pc.RemoteDescription().parsed, pc.api.settingEngine.getSDPExtensions())
if err != nil {
return err
}

sdesMidExtMap := getExtMapByURI(matchedSDPMap, sdp.SDESMidURI)
sdesStreamIDExtMap := getExtMapByURI(matchedSDPMap, sdp.SDESRTPStreamIDURI)
if sdesMidExtMap == nil || sdesStreamIDExtMap == nil {
return fmt.Errorf("mid and rid RTP Extensions required for Simulcast")
}

b := make([]byte, receiveMTU)
var mid, rid string
for readCount := 0; readCount <= simulcastProbeCount; readCount++ {
i, err := rtpStream.Read(b)
if err != nil {
return err
}

maybeMid, maybeRid, payloadType, err := handleUnknownRTPPacket(b[:i], sdesMidExtMap, sdesStreamIDExtMap)
if err != nil {
return err
}

if maybeMid != "" {
mid = maybeMid
}
if maybeRid != "" {
rid = maybeRid
}

if mid == "" || rid == "" {
continue
}

codec, err := pc.api.mediaEngine.getCodec(payloadType)
if err != nil {
return err
}

for _, t := range pc.GetTransceivers() {
if t.Mid() != mid || t.Receiver() == nil {
continue
}

track, err := t.Receiver().receiveForRid(rid, codec, ssrc)
if err != nil {
return err
}
pc.onTrack(track, t.Receiver())
return nil
}
}

return fmt.Errorf("incoming SSRC failed Simulcast probing")
}

// undeclaredMediaProcessor handles RTP/RTCP packets that don't match any a:ssrc lines
func (pc *PeerConnection) undeclaredMediaProcessor() {
go func() {
for {
srtpSession, err := pc.dtlsTransport.getSRTPSession()
if err != nil {
pc.log.Warnf("drainSRTP failed to open SrtpSession: %v", err)
pc.log.Warnf("undeclaredMediaProcessor failed to open SrtpSession: %v", err)
return
}

_, ssrc, err := srtpSession.AcceptStream()
stream, ssrc, err := srtpSession.AcceptStream()
if err != nil {
pc.log.Warnf("Failed to accept RTP %v", err)
return
}

if !handleUndeclaredSSRC(ssrc) {
pc.log.Warnf("Incoming unhandled RTP ssrc(%d), OnTrack will not be fired", ssrc)
if err := pc.handleUndeclaredSSRC(stream, ssrc); err != nil {
pc.log.Errorf("Incoming unhandled RTP ssrc(%d), OnTrack will not be fired. %v", ssrc, err)
}
}
}()
Expand All @@ -1096,7 +1174,7 @@ func (pc *PeerConnection) drainSRTP() {
for {
srtcpSession, err := pc.dtlsTransport.getSRTCPSession()
if err != nil {
pc.log.Warnf("drainSRTP failed to open SrtcpSession: %v", err)
pc.log.Warnf("undeclaredMediaProcessor failed to open SrtcpSession: %v", err)
return
}

Expand Down Expand Up @@ -1694,13 +1772,14 @@ func (pc *PeerConnection) startRTP(isRenegotiation bool, remoteDesc *SessionDesc

t.Receiver().Track().mu.Lock()
ssrc := t.Receiver().Track().ssrc
if _, ok := trackDetails[ssrc]; ok {
incoming := trackDetails[ssrc]
t.Receiver().Track().id = incoming.id
t.Receiver().Track().label = incoming.label

if details := trackDetailsForSSRC(trackDetails, ssrc); details != nil {
t.Receiver().Track().id = details.id
t.Receiver().Track().label = details.label
t.Receiver().Track().mu.Unlock()
continue
}

t.Receiver().Track().mu.Unlock()

if err := t.Receiver().Stop(); err != nil {
Expand All @@ -1721,7 +1800,7 @@ func (pc *PeerConnection) startRTP(isRenegotiation bool, remoteDesc *SessionDesc
pc.startRTPSenders(currentTransceivers)

if !isRenegotiation {
pc.drainSRTP()
pc.undeclaredMediaProcessor()
if haveApplicationMediaSection(remoteDesc.parsed) {
pc.startSCTP()
}
Expand Down
1 change: 1 addition & 0 deletions rtpcodingparameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package webrtc
// This is a subset of the RFC since Pion WebRTC doesn't implement encoding/decoding itself
// http://draft.ortc.org/#dom-rtcrtpcodingparameters
type RTPCodingParameters struct {
RID string `json:"rid"`
SSRC uint32 `json:"ssrc"`
PayloadType uint8 `json:"payloadType"`
}
2 changes: 1 addition & 1 deletion rtpreceiveparameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package webrtc

// RTPReceiveParameters contains the RTP stack settings used by receivers
type RTPReceiveParameters struct {
Encodings RTPDecodingParameters
Encodings []RTPDecodingParameters
}
Loading

0 comments on commit 6ee528d

Please sign in to comment.