Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

Commit

Permalink
Cleanup and fixing sfu behaviour.
Browse files Browse the repository at this point in the history
  • Loading branch information
mcottontensor committed Oct 27, 2023
1 parent bbcfe8a commit adfca6c
Showing 1 changed file with 162 additions and 47 deletions.
209 changes: 162 additions & 47 deletions SignallingWebServer/cirrus.js
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,83 @@ console.logColor(logging.Cyan, `Running Cirrus - The Pixel Streaming reference i

let nextPlayerId = 1;

const StreamerType = { Regular: 0, SFU: 1 };

class Streamer {
constructor(initialId, ws, type) {
this.id = initialId;
this.ws = ws;
this.type = type;
this.idCommitted = false;
}

// registers this streamers id
commitId(id) {
this.id = id;
this.idCommitted = true;
}

// returns true if we have a valid id
isIdCommitted() {
return this.idCommitted;
}

// links this streamer to a subscribed SFU player (player component of an SFU)
addSFUPlayer(sfuPlayerId) {
if (!!this.SFUPlayerId && this.SFUPlayerId != sfuPlayerId) {
console.error(`Streamer ${this.id} already has an SFU ${this.SFUPlayerId}. Trying to add ${sfuPlayerId} as SFU.`);
return;
}
this.SFUPlayerId = sfuPlayerId;
}

// removes the previously subscribed SFU player
removeSFUPlayer() {
delete this.SFUPlayerId;
}

// gets the player id of the subscribed SFU if any
getSFUPlayerId() {
return this.SFUPlayerId;
}

// returns true if this streamer is forwarding another streamer
isSFU() {
return this.type == StreamerType.SFU;
}

// links this streamer to a player, used for SFU connections since they have both components
setSFUPlayerComponent(playerComponent) {
if (!this.isSFU()) {
console.error(`Trying to add an SFU player component ${playerComponent.id} to streamer ${this.id} but it is not an SFU type.`);
return;
}
this.sfuPlayerComponent = playerComponent;
}

// gets the player component for this sfu
getSFUPlayerComponent() {
if (!this.isSFU()) {
console.error(`Trying to get an SFU player component from streamer ${this.id} but it is not an SFU type.`);
return null;
}
return this.sfuPlayerComponent;
}
}

const PlayerType = { Regular: 0, SFU: 1 };
const WhoSendsOffer = { Streamer: 0, Browser: 1 };

class Player {
constructor(id, ws, type, browserSendOffer) {
constructor(id, ws, type, whoSendsOffer) {
this.id = id;
this.ws = ws;
this.type = type;
this.browserSendOffer = browserSendOffer;
this.whoSendsOffer = whoSendsOffer;
}

isSFU() {
return this.type == PlayerType.SFU;
}

subscribe(streamerId) {
Expand All @@ -306,13 +375,9 @@ class Player {
this.streamerId = streamerId;
if (this.type == PlayerType.SFU) {
let streamer = streamers.get(this.streamerId);
if (!!streamer.SFUId) {
console.error(`Streamer ${this.streamerId} already has an SFU (${streamer.SFUId}) but we're trying to register player ${this.id} as an SFU.`);
} else {
streamer.SFUId = this.id;
}
streamer.addSFUPlayer(this.id);
}
const msg = { type: 'playerConnected', playerId: this.id, dataChannel: true, sfu: this.type == PlayerType.SFU, sendOffer: !this.browserSendOffer };
const msg = { type: 'playerConnected', playerId: this.id, dataChannel: true, sfu: this.type == PlayerType.SFU, sendOffer: this.whoSendsOffer == WhoSendsOffer.Streamer };
logOutgoing(this.streamerId, msg);
this.sendFrom(msg);
}
Expand All @@ -321,10 +386,10 @@ class Player {
if (this.streamerId && streamers.has(this.streamerId)) {
if (this.type == PlayerType.SFU) {
let streamer = streamers.get(this.streamerId);
if (!streamer.SFUId || streamer.SFUId != this.id) {
console.error(`Trying to unsibscribe SFU player ${this.id} from streamer ${streamer.id} but the current SFUId does not match (${streamer.SFUId}).`)
if (streamer.getSFUPlayerId() != this.id) {
console.error(`Trying to unsibscribe SFU player ${this.id} from streamer ${streamer.id} but the current SFUId does not match (${streamer.getSFUPlayerId()}).`)
} else {
delete streamer.SFUId;
streamer.removeSFUPlayer();
}
}
const msg = { type: 'playerDisconnected', playerId: this.id };
Expand Down Expand Up @@ -364,22 +429,40 @@ class Player {
const msgString = JSON.stringify(message);
this.ws.send(msgString);
}

setSFUStreamerComponent(streamerComponent) {
if (!this.isSFU()) {
console.error(`Trying to add an SFU streamer component ${streamerComponent.id} to player ${this.id} but it is not an SFU type.`);
return;
}
this.sfuStreamerComponent = streamerComponent;
}

getSFUStreamerComponent() {
if (!this.isSFU()) {
console.error(`Trying to get an SFU streamer component from player ${this.id} but it is not an SFU type.`);
return null;
}
return this.sfuStreamerComponent;
}
};

let streamers = new Map(); // streamerId <-> streamer socket
let players = new Map(); // playerId <-> player, where player is either a web-browser or a native webrtc player
let streamers = new Map(); // streamerId <-> streamer
let players = new Map(); // playerId <-> player/peer/viewer
const LegacyStreamerPrefix = "__LEGACY_STREAMER__"; // old streamers that dont know how to ID will be assigned this id prefix.
const streamerIdTimeoutSecs = 5;

// gets the SFU subscribed to this streamer if any.
function getSFUForStreamer(streamerId) {
if (!streamers.has(streamerId)) {
return null;
}
const streamer = streamers.get(streamerId);
if (!streamer.SFUId) {
const sfuPlayerId = streamer.getSFUPlayerId();
if (!!sfuPlayerId) {
return null;
}
return players.get(streamer.SFUId);
return players.get(sfuPlayerId);
}

function logIncoming(sourceName, msg) {
Expand Down Expand Up @@ -472,13 +555,34 @@ function requestStreamerId(streamer) {
}, streamerIdTimeoutSecs * 1000);
}

function sanitizeStreamerId(id) {
let maxPostfix = -1;
for (let [streamerId, streamer] of streamers) {
const idMatchRegex = /^(.*?)(\d*)$/;
const [, baseId, postfix] = streamerId.match(idMatchRegex);
if (baseId != id) {
continue;
}
const numPostfix = Number(postfix);
if (numPostfix > maxPostfix) {
maxPostfix = numPostfix
}
}
if (maxPostfix >= 0) {
return id + (maxPostfix + 1);
}
return id;
}

function registerStreamer(id, streamer) {
streamer.id = id;
streamers.set(streamer.id, streamer);
// make sure the id is unique
const uniqueId = sanitizeStreamerId(id);
streamer.commitId(uniqueId);
if (!!streamer.idTimer) {
clearTimeout(streamer.idTimer);
delete streamer.idTimer;
}
streamers.set(uniqueId, streamer);
console.logColor(logging.Green, `Registered new streamer: ${streamer.id}`);
}

Expand Down Expand Up @@ -558,7 +662,8 @@ streamerServer.on('connection', function (ws, req) {
console.logColor(logging.Green, `Streamer connected: ${req.connection.remoteAddress}`);
sendStreamerConnectedToMatchmaker();

let streamer = { id: req.connection.remoteAddress, ws: ws };
const temporaryId = req.connection.remoteAddress;
let streamer = new Streamer(temporaryId, ws, StreamerType.Regular);

ws.on('message', (msgRaw) => {
var msg;
Expand All @@ -569,6 +674,7 @@ streamerServer.on('connection', function (ws, req) {
ws.close(1008, 'Cannot parse');
return;
}
console.log(msgRaw);

let handler = streamerMessageHandlers.get(msg.type);
if (!handler || (typeof handler != 'function')) {
Expand Down Expand Up @@ -605,13 +711,13 @@ function forwardSFUMessageToPlayer(sfuPlayer, msg) {
const playerId = getPlayerIdFromMessage(msg);
const player = players.get(playerId);
if (player) {
logForward(sfuPlayer.streamer.id, playerId, msg);
logForward(sfuPlayer.getSFUStreamerComponent().id, playerId, msg);
player.sendTo(msg);
}
}

function forwardSFUMessageToStreamer(sfuPlayer, msg) {
logForward(sfuPlayer.streamer.id, sfuPlayer.streamerId, msg);
logForward(sfuPlayer.getSFUStreamerComponent().id, sfuPlayer.streamerId, msg);
msg.sfuId = sfuPlayer.id;
sfuPlayer.sendFrom(msg);
}
Expand All @@ -621,7 +727,7 @@ function onPeerDataChannelsSFUMessage(sfuPlayer, msg) {
const playerId = getPlayerIdFromMessage(msg);
const player = players.get(playerId);
if (player) {
logForward(sfuPlayer.streamer.id, playerId, msg);
logForward(sfuPlayer.getSFUStreamerComponent().id, playerId, msg);
player.sendTo(msg);
player.datachannel = true;
}
Expand All @@ -631,56 +737,60 @@ function onPeerDataChannelsSFUMessage(sfuPlayer, msg) {
function requestSFUStreamerId(sfuPlayer) {
// request id
const msg = { type: "identify" };
logOutgoing(sfuPlayer.streamer.id, msg);
sfuPlayer.streamer.ws.send(JSON.stringify(msg));
const sfuStreamerComponent = sfuPlayer.getSFUStreamerComponent();
logOutgoing(sfuStreamerComponent.id, msg);
sfuStreamerComponent.ws.send(JSON.stringify(msg));

sfuPlayer.streamer.idTimer = setTimeout(function() {
sfuStreamerComponent.idTimer = setTimeout(function() {
// streamer did not respond in time. give it a legacy id.
const newLegacyId = getUniqueSFUId();
if (newLegacyId.length == 0) {
const error = `Ran out of legacy ids.`;
console.error(error);
sfuPlayer.ws.close(1008, error);
} else {
sfuPlayer.streamer.id = newLegacyId;
sfuStreamerComponent.id = newLegacyId;
}
}, streamerIdTimeoutSecs * 1000);
}

function onSFUMessageId(sfuPlayer, msg) {
logIncoming(sfuPlayer.streamer.id, msg);
sfuPlayer.streamer.id = msg.id;
const sfuStreamerComponent = sfuPlayer.getSFUStreamerComponent();
logIncoming(sfuStreamerComponent.id, msg);
sfuStreamerComponent.id = msg.id;

if (!!sfuPlayer.streamer.idTimer) {
clearTimeout(sfuPlayer.streamer.idTimer);
delete sfuPlayer.streamer.idTimer;
if (!!sfuStreamerComponent.idTimer) {
clearTimeout(sfuStreamerComponent.idTimer);
delete sfuStreamerComponent.idTimer;
}
}

function onSFUMessageStartStreaming(sfuPlayer, msg) {
logIncoming(sfuPlayer.streamer.id, msg);
if (streamers.has(sfuPlayer.streamer.id)) {
console.error(`SFU ${sfuPlayer.streamer.id} is already registered as a streamer and streaming.`)
const sfuStreamerComponent = sfuPlayer.getSFUStreamerComponent();
logIncoming(sfuStreamerComponent.id, msg);
if (streamers.has(sfuStreamerComponent.id)) {
console.error(`SFU ${sfuStreamerComponent.id} is already registered as a streamer and streaming.`)
return;
}

registerStreamer(sfuPlayer.streamer.id, sfuPlayer.streamer);
registerStreamer(sfuStreamerComponent.id, sfuStreamerComponent);
}

function onSFUMessageStopStreaming(sfuPlayer, msg) {
logIncoming(sfuPlayer.streamer.id, msg);
if (!streamers.has(sfuPlayer.streamer.id)) {
console.error(`SFU ${sfuPlayer.streamer.id} is not registered as a streamer or streaming.`)
const sfuStreamerComponent = sfuPlayer.getSFUStreamerComponent();
logIncoming(sfuStreamerComponent.id, msg);
if (!streamers.has(sfuStreamerComponent.id)) {
console.error(`SFU ${sfuStreamerComponent.id} is not registered as a streamer or streaming.`)
return;
}

onStreamerDisconnected(sfuPlayer.streamer);
onStreamerDisconnected(sfuStreamerComponent);
}

function onSFUDisconnected(sfuPlayer) {
console.log("disconnecting SFU from streamer");
disconnectAllPlayers(sfuPlayer.id);
onStreamerDisconnected(sfuPlayer.streamer);
onStreamerDisconnected(sfuPlayer.getSFUStreamerComponent());
sfuPlayer.unsubscribe();
sfuPlayer.ws.close(4000, "SFU Disconnected");
players.delete(sfuPlayer.id);
Expand All @@ -704,9 +814,14 @@ sfuServer.on('connection', function (ws, req) {

let playerId = sanitizePlayerId(nextPlayerId++);
console.logColor(logging.Green, `SFU (${req.connection.remoteAddress}) connected `);
let player = new Player(playerId, ws, PlayerType.SFU, false);
player.streamer = { id: req.connection.remoteAddress, ws: ws }; // SFU also has a streamer component
players.set(playerId, player);

let streamerComponent = new Streamer(req.connection.remoteAddress, ws, StreamerType.SFU);
let playerComponent = new Player(playerId, ws, PlayerType.SFU, WhoSendsOffer.Streamer);

streamerComponent.setSFUPlayerComponent(playerComponent);
playerComponent.setSFUStreamerComponent(streamerComponent);

players.set(playerId, playerComponent);

ws.on('message', (msgRaw) => {
var msg;
Expand Down Expand Up @@ -739,20 +854,20 @@ sfuServer.on('connection', function (ws, req) {

ws.on('close', function(code, reason) {
console.error(`SFU disconnected: ${code} - ${reason}`);
onSFUDisconnected(player);
onSFUDisconnected(playerComponent);
});

ws.on('error', function(error) {
console.error(`SFU connection error: ${error}`);
onSFUDisconnected(player);
onSFUDisconnected(playerComponent);
try {
ws.close(1006 /* abnormal closure */, error);
} catch(err) {
console.error(`ERROR: ws.on error: ${err.message}`);
}
});

requestStreamerId(player.streamer);
requestStreamerId(playerComponent.getSFUStreamerComponent());
});

let playerCount = 0;
Expand Down Expand Up @@ -823,7 +938,7 @@ playerServer.on('connection', function (ws, req) {
var url = require('url');
const parsedUrl = url.parse(req.url);
const urlParams = new URLSearchParams(parsedUrl.search);
const browserSendOffer = urlParams.has('OfferToReceive') && urlParams.get('OfferToReceive') !== 'false';
const whoSendsOffer = urlParams.has('OfferToReceive') && urlParams.get('OfferToReceive') !== 'false' ? WhoSendsOffer.Browser : WhoSendsOffer.Streamer;

if (playerCount + 1 > maxPlayerCount && maxPlayerCount !== -1)
{
Expand All @@ -835,7 +950,7 @@ playerServer.on('connection', function (ws, req) {
++playerCount;
let playerId = sanitizePlayerId(nextPlayerId++);
console.logColor(logging.Green, `player ${playerId} (${req.connection.remoteAddress}) connected`);
let player = new Player(playerId, ws, PlayerType.Regular, browserSendOffer);
let player = new Player(playerId, ws, PlayerType.Regular, whoSendsOffer);
players.set(playerId, player);

ws.on('message', (msgRaw) =>{
Expand Down

0 comments on commit adfca6c

Please sign in to comment.