Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

Multi streamers #74

Merged
merged 3 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Frontend/library/src/Config/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export class TextParameters {
*/
export class OptionParameters {
static PreferredCodec = 'PreferredCodec';
static StreamerId = 'StreamerId';
}

export class Config {
Expand Down Expand Up @@ -121,6 +122,15 @@ export class Config {
)
);

this.optionParameters.set(OptionParameters.StreamerId,
new SettingOption(
OptionParameters.StreamerId,
'Streamer ID',
'The ID of the streamer to stream.',
'',
[])
);

/**
* Enum Parameters
*/
Expand Down Expand Up @@ -400,6 +410,10 @@ export class Config {
psSettingsSection,
this.textParameters.get(TextParameters.SignallingServerUrl)
);
this.addSettingOption(
psSettingsSection,
this.optionParameters.get(OptionParameters.StreamerId)
);
this.addSettingFlag(
psSettingsSection,
this.flags.get(Flags.AutoConnect)
Expand Down
31 changes: 29 additions & 2 deletions Frontend/library/src/WebRtcPlayer/WebRtcPlayerController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { StreamController } from '../VideoPlayer/StreamController';
import {
MessageAnswer,
MessageOffer,
MessageConfig
MessageConfig,
MessageStreamerList
} from '../WebSockets/MessageReceive';
import { FreezeFrameController } from '../FreezeFrame/FreezeFrameController';
import { AFKController } from '../AFK/AFKController';
Expand All @@ -17,7 +18,8 @@ import {
Config,
Flags,
ControlSchemeType,
TextParameters
TextParameters,
OptionParameters
} from '../Config/Config';
import {
EncoderSettings,
Expand Down Expand Up @@ -167,12 +169,23 @@ export class WebRtcPlayerController {
this.webSocketController.onConfig = (
messageConfig: MessageReceive.MessageConfig
) => this.handleOnConfigMessage(messageConfig);
this.webSocketController.onStreamerList = (
messageList: MessageReceive.MessageStreamerList
) => this.handleStreamerListMessage(messageList);
this.webSocketController.onWebSocketOncloseOverlayMessage = (event) =>
this.application.onDisconnect(
`Websocket disconnect (${event.code}) ${
event.reason != '' ? '- ' + event.reason : ''
}`
);
this.webSocketController.onOpen.addEventListener('open', () => {
const urlParams = new URLSearchParams(window.location.search);
if (urlParams.has(OptionParameters.StreamerId)) {
this.webSocketController.sendSubscribe(urlParams.get(OptionParameters.StreamerId));
} else {
this.webSocketController.requestStreamerList();
}
});
this.webSocketController.onClose.addEventListener('close', () => {
this.afkController.stopAfkWarningTimer();

Expand Down Expand Up @@ -212,6 +225,11 @@ export class WebRtcPlayerController {
this.isUsingSFU = false;
this.isQualityController = false;
this.preferredCodec = '';

this.config.addOnOptionSettingChangedListener(OptionParameters.StreamerId, (streamerid) => {
this.webSocketController.sendSubscribe(streamerid);
}
);
}

/**
Expand Down Expand Up @@ -1130,6 +1148,15 @@ export class WebRtcPlayerController {
) => this.handleIceCandidate(iceCandidate);
}

/**
* Handles when the signalling server gives us the list of streamer ids.
*/
handleStreamerListMessage(messageStreamerList: MessageStreamerList) {
Logger.Log(Logger.GetStackTrace(), `Got streamer list ${messageStreamerList.ids}`, 6);
messageStreamerList.ids.unshift(''); // add an empty option at the top
this.config.setOptionSettingOptions(OptionParameters.StreamerId, messageStreamerList.ids);
}

/**
* Handle the RTC Answer from the signaling server
* @param Answer - Answer SDP from the peer.
Expand Down
8 changes: 8 additions & 0 deletions Frontend/library/src/WebSockets/MessageReceive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
export enum MessageRecvTypes {
CONFIG = 'config',
STREAMER_LIST = 'streamerList',
PLAYER_COUNT = 'playerCount',
OFFER = 'offer',
ANSWER = 'answer',
Expand Down Expand Up @@ -34,6 +35,13 @@ export class MessageConfig extends MessageRecv {
peerConnectionOptions: RTCConfiguration;
}

/**
* Streamer List Message Wrapper
*/
export class MessageStreamerList extends MessageRecv {
ids: string[];
}

/**
* Player Count Message wrapper
*/
Expand Down
27 changes: 27 additions & 0 deletions Frontend/library/src/WebSockets/MessageSend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import { Logger } from '../Logger/Logger';
* The Send Types that are pushed from the signaling server
*/
export enum MessageSendTypes {
LIST_STREAMERS = 'listStreamers',
SUBSCRIBE = 'subscribe',
UNSUBSCRIBE = 'unsubscribe',
ICE_CANDIDATE = 'iceCandidate',
OFFER = 'offer',
ANSWER = 'answer',
Expand Down Expand Up @@ -43,6 +46,30 @@ export interface Send {
payload: () => string;
}

export class MessageListStreamers extends MessageSend {
constructor() {
super();
this.type = MessageSendTypes.LIST_STREAMERS;
}
}

export class MessageSubscribe extends MessageSend {
streamerId: string;

constructor(streamerid: string) {
super();
this.type = MessageSendTypes.SUBSCRIBE;
this.streamerId = streamerid;
}
}

export class MessageUnsubscribe extends MessageSend {
constructor() {
super();
this.type = MessageSendTypes.UNSUBSCRIBE;
}
}

/**
* Instance Request Message Wrapper
*/
Expand Down
13 changes: 11 additions & 2 deletions Frontend/library/src/WebSockets/SignallingProtocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { WebSocketController } from './WebSocketController';
import {
MessageRecvTypes,
MessageConfig,
MessageStreamerList,
MessagePlayerCount,
MessageAnswer,
MessageOffer,
Expand Down Expand Up @@ -70,18 +71,26 @@ export class SignallingProtocol {
websocketController.signallingProtocol.addMessageHandler(
MessageRecvTypes.CONFIG,
(configPayload: string) => {
// send our pong payload back to the signalling server
Logger.Log(Logger.GetStackTrace(), MessageRecvTypes.CONFIG, 6);
const config: MessageConfig = JSON.parse(configPayload);
websocketController.onConfig(config);
}
);

// STREAMER_LIST
websocketController.signallingProtocol.addMessageHandler(
MessageRecvTypes.STREAMER_LIST,
(listPayload: string) => {
Logger.Log(Logger.GetStackTrace(), MessageRecvTypes.STREAMER_LIST, 6);
const streamerList: MessageStreamerList = JSON.parse(listPayload);
websocketController.onStreamerList(streamerList);
}
);

// PLAYER_COUNT
websocketController.signallingProtocol.addMessageHandler(
MessageRecvTypes.PLAYER_COUNT,
(playerCountPayload: string) => {
// send our pong payload back to the signalling server
Logger.Log(
Logger.GetStackTrace(),
MessageRecvTypes.PLAYER_COUNT,
Expand Down
25 changes: 25 additions & 0 deletions Frontend/library/src/WebSockets/WebSocketController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ declare global {
export class WebSocketController {
WS_OPEN_STATE = 1;
webSocket: WebSocket;
onOpen: EventTarget;
onClose: EventTarget;
signallingProtocol: SignallingProtocol;

constructor() {
this.onOpen = new EventTarget();
this.onClose = new EventTarget();
this.signallingProtocol = new SignallingProtocol();
SignallingProtocol.setupDefaultHandlers(this);
Expand Down Expand Up @@ -116,6 +118,7 @@ export class WebSocketController {
'Connected to the signalling server via WebSocket',
6
);
this.onOpen.dispatchEvent(new Event('open'));
}

/**
Expand All @@ -142,6 +145,21 @@ export class WebSocketController {
this.onClose.dispatchEvent(new Event('close'));
}

requestStreamerList() {
const payload = new MessageSend.MessageListStreamers();
this.webSocket.send(payload.payload());
}

sendSubscribe(streamerid: string) {
const payload = new MessageSend.MessageSubscribe(streamerid);
this.webSocket.send(payload.payload());
}

sendUnsubscribe() {
const payload = new MessageSend.MessageUnsubscribe();
this.webSocket.send(payload.payload());
}

sendWebRtcOffer(offer: RTCSessionDescriptionInit) {
const payload = new MessageSend.MessageWebRTCOffer(offer);
this.webSocket.send(payload.payload());
Expand Down Expand Up @@ -197,6 +215,13 @@ export class WebSocketController {
// eslint-disable-next-line @typescript-eslint/no-unused-vars, @typescript-eslint/no-empty-function
onConfig(messageConfig: MessageReceive.MessageConfig) {}

/**
* The Message Contains the payload of the peer connection options used for the RTC Peer hand shake
* @param messageConfig - Config Message received from he signaling server
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars, @typescript-eslint/no-empty-function
onStreamerList(messageStreamerList: MessageReceive.MessageStreamerList) {}

/**
* @param iceCandidate - Ice Candidate sent from the Signaling server server's RTC hand shake
*/
Expand Down
1 change: 1 addition & 0 deletions SFU/sfu_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ function connectSignalling(server) {
signalServer.addEventListener("error", result => { console.log(`Error: ${result.message}`); });
signalServer.addEventListener("message", result => onSignallingMessage(result.data));
signalServer.addEventListener("close", result => {
onStreamerDisconnected();
console.log(`Disconnected from signalling server: ${result.code} ${result.reason}`);
console.log("Attempting reconnect to signalling server...");
setTimeout(()=> {
Expand Down
Loading