Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

Commit

Permalink
working on handling multiple sfus gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
mcottontensor committed Oct 24, 2023
1 parent 127feac commit 01d8056
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 63 deletions.
3 changes: 3 additions & 0 deletions SFU/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ for(let arg of process.argv){

const config = {
signallingURL: "ws://localhost:8889",
SFUId: "SFU",
subscribeStreamerId: "DefaultStreamer",
retrySubscribeDelaySecs: 10,

mediasoup: {
worker: {
Expand Down
62 changes: 59 additions & 3 deletions SFU/sfu_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ const WebSocket = require('ws');
const mediasoup = require('mediasoup_prebuilt');
const mediasoupSdp = require('mediasoup-sdp-bridge');

if (!config.retrySubscribeDelaySecs) {
config.retrySubscribeDelaySecs = 10;
}

let signalServer = null;
let mediasoupRouter;
let streamer = null;
Expand All @@ -11,7 +15,7 @@ let peers = new Map();
function connectSignalling(server) {
console.log("Connecting to Signalling Server at %s", server);
signalServer = new WebSocket(server);
signalServer.addEventListener("open", _ => { console.log(`Connected to signalling server`); });
signalServer.addEventListener("open", _ => onSignallingConnected());
signalServer.addEventListener("error", result => { console.log(`Error: ${result.message}`); });
signalServer.addEventListener("message", result => onSignallingMessage(result.data));
signalServer.addEventListener("close", result => {
Expand All @@ -24,6 +28,42 @@ function connectSignalling(server) {
});
}

async function onSignallingConnected() {
console.log(`Connected to signalling server`);
//signalServer.send(JSON.stringify({type: 'listStreamers'}));
}

async function onStreamerList(msg) {
let success = false;

// subscribe to either the configured streamer, or if not configured, just grab the first id
if (msg.ids.length > 0) {
if (!!config.subscribeStreamerId) {
if (msg.ids.includes(config.subscribeStreamerId)) {
signalServer.send(JSON.stringify({type: 'subscribe', streamerId: config.subscribeStreamerId}));
success = true;
}
} else {
signalServer.send(JSON.stringify({type: 'subscribe', streamerId: msg.ids[0]}));
success = true;
}
}

if (!success) {
// did not subscribe to anything
console.log(`No subscribe (${config.retrySubscribeDelaySecs}`)
setTimeout(function() {
signalServer.send(JSON.stringify({type: 'listStreamers'}));
}, config.retrySubscribeDelaySecs * 1000);
}
}

async function onIdentify(msg) {
console.log(JSON.stringify({type: 'endpointId', id: config.SFUId}));
signalServer.send(JSON.stringify({type: 'endpointId', id: config.SFUId}));
signalServer.send(JSON.stringify({type: 'listStreamers'}));
}

async function onStreamerOffer(sdp) {
console.log("Got offer from streamer");

Expand Down Expand Up @@ -228,7 +268,7 @@ function onLayerPreference(msg) {
}

async function onSignallingMessage(message) {
//console.log(`Got MSG: ${message}`);
console.log(`Got MSG: ${message}`);
const msg = JSON.parse(message);

if (msg.type == 'offer') {
Expand All @@ -255,6 +295,14 @@ async function onSignallingMessage(message) {
else if (msg.type == 'layerPreference') {
onLayerPreference(msg);
}
else if (msg.type == 'streamerList') {
console.log('WA WA WEE WOO ----------------------------------------------------------------------');
onStreamerList(msg);
}
else if (msg.type == 'identify') {
console.log('identifying...');
onIdentify(msg);
}
}

async function startMediasoup() {
Expand All @@ -276,6 +324,14 @@ async function startMediasoup() {
return mediasoupRouter;
}

async function onICEStateChange(identifier, iceState) {
console.log("%s ICE state changed to %s", identifier, iceState);

if (identifier == 'Streamer' && iceState == 'completed') {
signalServer.send(JSON.stringify({type: 'startStreaming'}));
}
}

async function createWebRtcTransport(identifier) {
const {
listenIps,
Expand All @@ -291,7 +347,7 @@ async function createWebRtcTransport(identifier) {
initialAvailableOutgoingBitrate: initialAvailableOutgoingBitrate
});

transport.on("icestatechange", (iceState) => { console.log("%s ICE state changed to %s", identifier, iceState); });
transport.on("icestatechange", (iceState) => onICEStateChange(identifier, iceState));
transport.on("iceselectedtuplechange", (iceTuple) => { console.log("%s ICE selected tuple %s", identifier, JSON.stringify(iceTuple)); });
transport.on("sctpstatechange", (sctpState) => { console.log("%s SCTP state changed to %s", identifier, sctpState); });

Expand Down
Loading

0 comments on commit 01d8056

Please sign in to comment.