Skip to content

Commit

Permalink
rtmp-services: Refactor Twitch/Amazon IVS support
Browse files Browse the repository at this point in the history
Separate the commonly used functions into service-ingest.c/h.
Split the Amazon IVS support out of the Twitch specific files
and into the new amazon-ivs.c/h files. This allows for clean
usage of `struct ingest` between the two services.
  • Loading branch information
lexano-ivs authored and RytoEX committed Sep 5, 2024
1 parent c61417f commit 8f29480
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 301 deletions.
4 changes: 4 additions & 0 deletions plugins/rtmp-services/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ target_sources(
rtmp-custom.c
rtmp-format-ver.h
rtmp-services-main.c
service-specific/amazon-ivs.c
service-specific/amazon-ivs.h
service-specific/dacast.c
service-specific/dacast.h
service-specific/nimotv.c
service-specific/nimotv.h
service-specific/service-ingest.c
service-specific/service-ingest.h
service-specific/showroom.c
service-specific/showroom.h
service-specific/twitch.c
Expand Down
35 changes: 20 additions & 15 deletions plugins/rtmp-services/rtmp-common.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "service-specific/nimotv.h"
#include "service-specific/showroom.h"
#include "service-specific/dacast.h"
#include "service-specific/amazon-ivs.h"

struct rtmp_common {
char *service;
Expand Down Expand Up @@ -429,8 +430,9 @@ static bool fill_twitch_servers_locked(obs_property_t *servers_prop)
return false;

for (size_t i = 0; i < count; i++) {
struct twitch_ingest ing = twitch_ingest(i);
obs_property_list_add_string(servers_prop, ing.name, ing.url);
struct ingest twitch_ing = twitch_ingest(i);
obs_property_list_add_string(servers_prop, twitch_ing.name,
twitch_ing.url);
}

return true;
Expand Down Expand Up @@ -466,18 +468,20 @@ static bool fill_amazon_ivs_servers_locked(obs_property_t *servers_prop)

if (rtmps_available) {
for (size_t i = 0; i < count; i++) {
struct twitch_ingest ing = amazon_ivs_ingest(i);
dstr_printf(&name_buffer, "%s (RTMPS)", ing.name);
obs_property_list_add_string(
servers_prop, name_buffer.array, ing.rtmps_url);
struct ingest amazon_ivs_ing = amazon_ivs_ingest(i);
dstr_printf(&name_buffer, "%s (RTMPS)",
amazon_ivs_ing.name);
obs_property_list_add_string(servers_prop,
name_buffer.array,
amazon_ivs_ing.rtmps_url);
}
}

for (size_t i = 0; i < count; i++) {
struct twitch_ingest ing = amazon_ivs_ingest(i);
dstr_printf(&name_buffer, "%s (RTMP)", ing.name);
struct ingest amazon_ivs_ing = amazon_ivs_ingest(i);
dstr_printf(&name_buffer, "%s (RTMP)", amazon_ivs_ing.name);
obs_property_list_add_string(servers_prop, name_buffer.array,
ing.url);
amazon_ivs_ing.url);
}

dstr_free(&name_buffer);
Expand Down Expand Up @@ -868,31 +872,32 @@ static const char *rtmp_common_url(void *data)

if (service->service && strcmp(service->service, "Twitch") == 0) {
if (service->server && strcmp(service->server, "auto") == 0) {
struct twitch_ingest ing;
struct ingest twitch_ing;

twitch_ingests_refresh(3);

twitch_ingests_lock();
ing = twitch_ingest(0);
twitch_ing = twitch_ingest(0);
twitch_ingests_unlock();

return ing.url;
return twitch_ing.url;
}
}

if (service->service && strcmp(service->service, "Amazon IVS") == 0) {
if (service->server &&
strncmp(service->server, "auto", 4) == 0) {
struct twitch_ingest ing;
struct ingest amazon_ivs_ing;
bool rtmp = strcmp(service->server, "auto-rtmp") == 0;

amazon_ivs_ingests_refresh(3);

amazon_ivs_ingests_lock();
ing = amazon_ivs_ingest(0);
amazon_ivs_ing = amazon_ivs_ingest(0);
amazon_ivs_ingests_unlock();

return rtmp ? ing.url : ing.rtmps_url;
return rtmp ? amazon_ivs_ing.url
: amazon_ivs_ing.rtmps_url;
}
}

Expand Down
58 changes: 58 additions & 0 deletions plugins/rtmp-services/service-specific/amazon-ivs.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#include "service-ingest.h"
#include "amazon-ivs.h"

static struct service_ingests amazon_ivs = {
.update_info = NULL,
.mutex = PTHREAD_MUTEX_INITIALIZER,
.ingests_refreshed = false,
.ingests_refreshing = false,
.ingests_loaded = false,
.cur_ingests = {0},
.cache_old_filename = "amazon_ivs_ingests.json",
.cache_new_filename = "amazon_ivs_ingests.new.json"};

void init_amazon_ivs_data(void)
{
init_service_data(&amazon_ivs);
}

void load_amazon_ivs_data(void)
{
struct ingest def = {
.name = bstrdup("Default"),
.url = bstrdup(
"rtmps://ingest.global-contribute.live-video.net:443/app/")};
load_service_data(&amazon_ivs, "amazon_ivs_ingests.json", &def);
}

void unload_amazon_ivs_data(void)
{
unload_service_data(&amazon_ivs);
}

void amazon_ivs_ingests_refresh(int seconds)
{
service_ingests_refresh(
&amazon_ivs, seconds, "[amazon ivs ingest update] ",
"https://ingest.contribute.live-video.net/ingests");
}

void amazon_ivs_ingests_lock(void)
{
pthread_mutex_lock(&amazon_ivs.mutex);
}

void amazon_ivs_ingests_unlock(void)
{
pthread_mutex_unlock(&amazon_ivs.mutex);
}

size_t amazon_ivs_ingest_count(void)
{
return amazon_ivs.cur_ingests.num;
}

struct ingest amazon_ivs_ingest(size_t idx)
{
return get_ingest(&amazon_ivs, idx);
}
8 changes: 8 additions & 0 deletions plugins/rtmp-services/service-specific/amazon-ivs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#pragma once

#include "service-ingest.h"

extern void amazon_ivs_ingests_lock(void);
extern void amazon_ivs_ingests_unlock(void);
extern size_t amazon_ivs_ingest_count(void);
extern struct ingest amazon_ivs_ingest(size_t idx);
199 changes: 199 additions & 0 deletions plugins/rtmp-services/service-specific/service-ingest.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#include <util/platform.h>
#include <obs-module.h>
#include <util/dstr.h>
#include <jansson.h>
#include "service-ingest.h"

extern const char *get_module_name(void);

void init_service_data(struct service_ingests *si)
{
da_init(si->cur_ingests);
pthread_mutex_init(&si->mutex, NULL);
}

static void free_ingests(struct service_ingests *si)
{
for (size_t i = 0; i < si->cur_ingests.num; i++) {
struct ingest *ingest = si->cur_ingests.array + i;
bfree(ingest->name);
bfree(ingest->url);
bfree(ingest->rtmps_url);
}

da_free(si->cur_ingests);
}

static bool load_ingests(struct service_ingests *si, const char *json,
bool write_file)
{
json_t *root;
json_t *ingests;
bool success = false;
char *cache_old;
char *cache_new;
size_t count;

root = json_loads(json, 0, NULL);
if (!root)
goto finish;

ingests = json_object_get(root, "ingests");
if (!ingests)
goto finish;

count = json_array_size(ingests);
if (count <= 1 && si->cur_ingests.num)
goto finish;

free_ingests(si);

for (size_t i = 0; i < count; i++) {
json_t *item = json_array_get(ingests, i);
json_t *item_name = json_object_get(item, "name");
json_t *item_url = json_object_get(item, "url_template");
json_t *item_rtmps_url =
json_object_get(item, "url_template_secure");
struct ingest ingest = {0};
struct dstr url = {0};
struct dstr rtmps_url = {0};

if (!item_name || !item_url)
continue;

const char *url_str = json_string_value(item_url);
const char *rtmps_url_str = json_string_value(item_rtmps_url);
const char *name_str = json_string_value(item_name);

/* At the moment they currently mis-spell "deprecated",
* but that may change in the future, so blacklist both */
if (strstr(name_str, "deprecated") != NULL ||
strstr(name_str, "depracated") != NULL)
continue;

dstr_copy(&url, url_str);
dstr_replace(&url, "/{stream_key}", "");

dstr_copy(&rtmps_url, rtmps_url_str);
dstr_replace(&rtmps_url, "/{stream_key}", "");

ingest.name = bstrdup(name_str);
ingest.url = url.array;
ingest.rtmps_url = rtmps_url.array;

da_push_back(si->cur_ingests, &ingest);
}

if (!si->cur_ingests.num)
goto finish;

success = true;

if (!write_file)
goto finish;

cache_old = obs_module_config_path(si->cache_old_filename);
cache_new = obs_module_config_path(si->cache_new_filename);

os_quick_write_utf8_file(cache_new, json, strlen(json), false);
os_safe_replace(cache_old, cache_new, NULL);

bfree(cache_old);
bfree(cache_new);

finish:
if (root)
json_decref(root);
return success;
}

static bool ingest_update(void *param, struct file_download_data *data)
{
struct service_ingests *service = param;
bool success;

pthread_mutex_lock(&service->mutex);
success = load_ingests(service, (const char *)data->buffer.array, true);
pthread_mutex_unlock(&service->mutex);

if (success) {
os_atomic_set_bool(&service->ingests_refreshed, true);
os_atomic_set_bool(&service->ingests_loaded, true);
}

return true;
}

void service_ingests_refresh(struct service_ingests *si, int seconds,
const char *log_prefix, const char *file_url)
{
if (os_atomic_load_bool(&si->ingests_refreshed))
return;

if (!os_atomic_load_bool(&si->ingests_refreshing)) {
os_atomic_set_bool(&si->ingests_refreshing, true);

si->update_info =
update_info_create_single(log_prefix, get_module_name(),
file_url, ingest_update, si);
}

/* wait five seconds max when loading ingests for the first time */
if (!os_atomic_load_bool(&si->ingests_loaded)) {
for (int i = 0; i < seconds * 100; i++) {
if (os_atomic_load_bool(&si->ingests_refreshed)) {
break;
}
os_sleep_ms(10);
}
}
}

void load_service_data(struct service_ingests *si, const char *cache_filename,
struct ingest *def)
{
char *service_cache = obs_module_config_path(cache_filename);

pthread_mutex_lock(&si->mutex);
da_push_back(si->cur_ingests, def);
pthread_mutex_unlock(&si->mutex);

if (os_file_exists(service_cache)) {
char *data = os_quick_read_utf8_file(service_cache);
bool success;

pthread_mutex_lock(&si->mutex);
success = load_ingests(si, data, false);
pthread_mutex_unlock(&si->mutex);

if (success) {
os_atomic_set_bool(&si->ingests_loaded, true);
}

bfree(data);
}

bfree(service_cache);
}

void unload_service_data(struct service_ingests *si)
{
update_info_destroy(si->update_info);
free_ingests(si);
pthread_mutex_destroy(&si->mutex);
}

struct ingest get_ingest(struct service_ingests *si, size_t idx)
{
struct ingest ingest;

if (si->cur_ingests.num <= idx) {
ingest.name = NULL;
ingest.url = NULL;
ingest.rtmps_url = NULL;
} else {
ingest = *(struct ingest *)(si->cur_ingests.array + idx);
}

return ingest;
}
Loading

0 comments on commit 8f29480

Please sign in to comment.