Skip to content

Commit

Permalink
API: Support kickoff stream by name. v5.14.16
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Apr 18, 2024
1 parent 75a9e3a commit 043b7c6
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 12 deletions.
24 changes: 13 additions & 11 deletions DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,15 @@ API without any authentication:
* `/terraform/v1/ai/transcript/hls/original/:uuid.m3u8` Generate the preview HLS for original stream without overlay text.
* `/terraform/v1/mgmt/beian/query` Query the beian information.
* `/terraform/v1/ai-talk/stage/hello-voices/:file.aac` AI-Talk: Play the example audios.
* `/.well-known/acme-challenge/` HTTPS verify mount for letsencrypt.
* For SRS proxy:
* `/rtc/` Proxy for SRS: HTTP API for WebRTC of SRS media server.
* `/*/*.(flv|m3u8|ts|aac|mp3)` Proxy for SRS: Media stream for HTTP-FLV, HLS, HTTP-TS, HTTP-AAC, HTTP-MP3.
* For static files:
* `/tools/` A set of H5 tools, like simple player, xgplayer, etc, serve by mgmt.
* `/console/` The SRS console, serve by mgmt.
* `/players/` The SRS player, serve by mgmt.
* `/mgmt/` The ui for mgmt, serve by mgmt.

API without token authentication, but with password authentication:

Expand All @@ -923,6 +932,8 @@ Platform, with token authentication:
* `/terraform/v1/mgmt/hooks/apply` Update the HTTP callback.
* `/terraform/v1/mgmt/hooks/query` Query the HTTP callback.
* `/terraform/v1/mgmt/hooks/example` Example target for HTTP callback.
* `/terraform/v1/mgmt/streams/query` Query the active streams.
* `/terraform/v1/mgmt/streams/kickoff` Kickoff the stream by name.
* `/terraform/v1/hooks/srs/verify` Hooks: Verify the stream request URL of SRS.
* `/terraform/v1/hooks/srs/secret/query` Hooks: Query the secret to generate stream URL.
* `/terraform/v1/hooks/srs/secret/update` Hooks: Update the secret to generate stream URL.
Expand Down Expand Up @@ -987,19 +998,9 @@ Platform, with token authentication:
* `/terraform/v1/ai/transcript/fix-queue` Query the fix queue of transcript.
* `/terraform/v1/ai/transcript/overlay-queue` Query the overlay queue of transcript.

Also provided by platform for market:
Also provided by platform for SRS proxy:

* `/api/` SRS: HTTP API of SRS media server. With token authentication.
* `/rtc/` SRS: HTTP API for WebRTC of SRS media server. Without authentication.
* `/*/*.(flv|m3u8|ts|aac|mp3)` SRS: Media stream for HTTP-FLV, HLS, HTTP-TS, HTTP-AAC, HTTP-MP3. Without authentication.
* `/.well-known/acme-challenge/` HTTPS verify mount for letsencrypt. Without authentication.

Also provided by platform for static Files:

* `/tools/` A set of H5 tools, like simple player, xgplayer, etc, serve by mgmt. Without authentication.
* `/console/` The SRS console, serve by mgmt. Without authentication.
* `/players/` The SRS player, serve by mgmt. Without authentication.
* `/mgmt/` The ui for mgmt, serve by mgmt. Without authentication.

**Deprecated** API:

Expand Down Expand Up @@ -1160,6 +1161,7 @@ The following are the update records for the Oryx server.
* Use port 80/443 by default in README. v5.14.14
* Use fastfail for test and utest. v5.14.15
* Rename project to Oryx.[v5.14.15](https://github.com/ossrs/oryx/releases/tag/v5.14.15)
* API: Support kickoff stream by name. v5.14.16
* v5.13:
* Fix bug for vlive and transcript. v5.13.1
* Support AWS Lightsail install script. v5.13.2
Expand Down
161 changes: 161 additions & 0 deletions platform/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/json"
"fmt"
"github.com/joho/godotenv"
"io"
"io/ioutil"
"net/http"
"os"
Expand Down Expand Up @@ -277,6 +278,8 @@ func handleHTTPService(ctx context.Context, handler *http.ServeMux) error {
handleMgmtSsl(ctx, handler)
handleMgmtLetsEncrypt(ctx, handler)
handleMgmtCertQuery(ctx, handler)
handleMgmtStreamsQuery(ctx, handler)
handleMgmtStreamsKickoff(ctx, handler)
handleMgmtUI(ctx, handler)

proxy2023, err := httpCreateProxy("http://127.0.0.1:2023")
Expand Down Expand Up @@ -1399,6 +1402,164 @@ func handleMgmtCertQuery(ctx context.Context, handler *http.ServeMux) {
})
}

func handleMgmtStreamsQuery(ctx context.Context, handler *http.ServeMux) {
ep := "/terraform/v1/mgmt/streams/query"
logger.Tf(ctx, "Handle %v", ep)
handler.HandleFunc(ep, func(w http.ResponseWriter, r *http.Request) {
if err := func() error {
var token string
if err := ParseBody(ctx, r.Body, &struct {
Token *string `json:"token"`
}{
Token: &token,
}); err != nil {
return errors.Wrapf(err, "parse body")
}

apiSecret := os.Getenv("SRS_PLATFORM_SECRET")
if err := Authenticate(ctx, apiSecret, token, r.Header); err != nil {
return errors.Wrapf(err, "authenticate")
}

streams, err := rdb.HGetAll(ctx, SRS_STREAM_ACTIVE).Result()
if err != nil {
return errors.Wrapf(err, "hgetall %v", SRS_STREAM_ACTIVE)
}

var streamObjects []*SrsStream
for _, value := range streams {
var stream SrsStream
if err := json.Unmarshal([]byte(value), &stream); err != nil {
return errors.Wrapf(err, "unmarshal %v", value)
}

streamObjects = append(streamObjects, &stream)
}

ohttp.WriteData(ctx, w, r, &struct {
Streams []*SrsStream `json:"streams"`
}{
streamObjects,
})
logger.Tf(ctx, "query streams ok, streams=%v, token=%vB", len(streamObjects), len(token))
return nil
}(); err != nil {
ohttp.WriteError(ctx, w, r, err)
}
})
}

// See SRS error code ERROR_RTMP_CLIENT_NOT_FOUND
const ErrorRtmpClientNotFound = 2049

func handleMgmtStreamsKickoff(ctx context.Context, handler *http.ServeMux) {
ep := "/terraform/v1/mgmt/streams/kickoff"
logger.Tf(ctx, "Handle %v", ep)
handler.HandleFunc(ep, func(w http.ResponseWriter, r *http.Request) {
if err := func() error {
var token string
var vhost, app, stream, clientID string
if err := ParseBody(ctx, r.Body, &struct {
Token *string `json:"token"`
Vhost *string `json:"vhost"`
App *string `json:"app"`
Stream *string `json:"stream"`
ClientID *string `json:"client_id"`
}{
Token: &token, Vhost: &vhost, App: &app, Stream: &stream, ClientID: &clientID,
}); err != nil {
return errors.Wrapf(err, "parse body")
}

apiSecret := os.Getenv("SRS_PLATFORM_SECRET")
if err := Authenticate(ctx, apiSecret, token, r.Header); err != nil {
return errors.Wrapf(err, "authenticate")
}

if vhost == "" {
return errors.New("no vhost")
}
if app == "" {
return errors.New("no app")
}
if stream == "" {
return errors.New("no stream")
}

streamObject := &SrsStream{Vhost: vhost, App: app, Stream: stream}
streamURL := streamObject.StreamURL()
if target, err := rdb.HGet(ctx, SRS_STREAM_ACTIVE, streamURL).Result(); err != nil && err != redis.Nil {
return errors.Wrapf(err, "hget %v %v", SRS_STREAM_ACTIVE, streamURL)
} else if target == "" {
return errors.Errorf("stream not found %v", streamURL)
}

// Start request and parse the code.
requestClient := func(ctx context.Context, clientURL, method string) (int, string, error) {
req, err := http.NewRequest(method, clientURL, nil)
if err != nil {
return 0, "", errors.Wrapf(err, "new request")
}

res, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return 0, "", errors.Wrapf(err, "do request")
}
defer res.Body.Close()

b, err := io.ReadAll(res.Body)
if err != nil {
return 0, "", errors.Wrapf(err, "http read body")
}

if res.StatusCode != http.StatusOK {
return 0, "", errors.Errorf("status %v", res.StatusCode)
}

var code int
if err := json.Unmarshal(b, &struct {
Code *int `json:"code"`
}{
Code: &code,
}); err != nil {
return 0, "", errors.Wrapf(err, "unmarshal %v", string(b))
}
return code, string(b), nil
}

// Whether client exists in SRS server.
var code int
clientURL := fmt.Sprintf("http://127.0.0.1:1985/api/v1/clients/%v", clientID)
if r0, body, err := requestClient(ctx, clientURL, http.MethodGet); err != nil {
return errors.Wrapf(err, "http query client %v", clientURL)
} else if r0 != 0 && r0 != ErrorRtmpClientNotFound {
return errors.Errorf("invalid code=%v, body=%v", r0, body)
} else {
code = r0
}

// Kickoff if exists, ignore if not.
if code == 0 {
if r0, body, err := requestClient(ctx, clientURL, http.MethodDelete); err != nil {
return errors.Wrapf(err, "kickoff %v, body %v", clientURL, body)
} else if r0 != 0 && r0 != ErrorRtmpClientNotFound {
return errors.Errorf("invalid code=%v, body=%v", r0, body)
}
}

if err := rdb.HDel(ctx, SRS_STREAM_ACTIVE, streamURL).Err(); err != nil && err != redis.Nil {
return errors.Wrapf(err, "hdel %v %v", SRS_STREAM_ACTIVE, streamURL)
}

ohttp.WriteData(ctx, w, r, nil)
logger.Tf(ctx, "kickoff stream ok, code=%v, token=%vB", code, len(token))
return nil
}(); err != nil {
ohttp.WriteError(ctx, w, r, err)
}
})
}

func handleMgmtUI(ctx context.Context, handler *http.ServeMux) {
// Serve UI at platform.
fileRoot := path.Join(conf.Pwd, "../ui/build", os.Getenv("REACT_APP_LOCALE"))
Expand Down
110 changes: 110 additions & 0 deletions test/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"os"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -257,3 +258,112 @@ func TestSystem_CandidateByHostIp(t *testing.T) {
return
}
}

func TestSystem_WithStream_PublishRtmpKickoff(t *testing.T) {
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsLongTimeout)*time.Millisecond)
defer cancel()

if *noMediaTest {
return
}

var r0, r1, r2, r3, r4, r5 error
defer func(ctx context.Context) {
if err := filterTestError(ctx.Err(), r0, r1, r2, r3, r4, r5); err != nil {
t.Errorf("Fail for err %+v", err)
} else {
logger.Tf(ctx, "test done")
}
}(ctx)

var pubSecret string
if err := NewApi().WithAuth(ctx, "/terraform/v1/hooks/srs/secret/query", nil, &struct {
Publish *string `json:"publish"`
}{
Publish: &pubSecret,
}); err != nil {
r0 = err
return
}

var wg sync.WaitGroup
defer wg.Wait()

// Start FFmpeg to publish stream.
streamID := fmt.Sprintf("stream-%v-%v", os.Getpid(), rand.Int())
streamURL := fmt.Sprintf("%v/live/%v?secret=%v", *endpointRTMP, streamID, pubSecret)
ffmpeg := NewFFmpeg(func(v *ffmpegClient) {
v.args = []string{
"-re", "-stream_loop", "-1", "-i", *srsInputFile, "-c", "copy",
"-f", "flv", streamURL,
}
})
wg.Add(1)
go func() {
defer wg.Done()
r1 = ffmpeg.Run(ctx, cancel)
}()

// Wait for stream to be ready.
select {
case <-ctx.Done():
case <-time.After(5 * time.Second):
}

// Query all streams.
type StreamQueryResult struct {
Vhost string `json:"vhost"`
App string `json:"app"`
Stream string `json:"stream"`
Client string `json:"client_id"`
}
var streams []StreamQueryResult
if err := NewApi().WithAuth(ctx, "/terraform/v1/mgmt/streams/query", nil, &struct {
Streams *[]StreamQueryResult `json:"streams"`
}{
Streams: &streams,
}); err != nil {
r0 = err
return
}

// Find the target stream.
var stream *StreamQueryResult
for _, s := range streams {
if s.Stream == streamID {
stream = &s
break
}
}
if stream == nil {
r0 = errors.Errorf("stream %v not found", streamID)
return
}

// Kickoff the stream.
if err := NewApi().WithAuth(ctx, "/terraform/v1/mgmt/streams/kickoff", stream, nil); err != nil {
r0 = err
return
}

streams = nil
if err := NewApi().WithAuth(ctx, "/terraform/v1/mgmt/streams/query", nil, &struct {
Streams *[]StreamQueryResult `json:"streams"`
}{
Streams: &streams,
}); err != nil {
r0 = err
return
}

for _, s := range streams {
if s.Stream == streamID {
r0 = errors.Errorf("stream %v should be kicked off", streamID)
return
}
}

time.Sleep(3 * time.Second)
logger.Tf(ctx, "kickoff ok")
cancel()
}
Loading

0 comments on commit 043b7c6

Please sign in to comment.