From a75f52039a8576969d3b2ad280884b9f04cfdc6a Mon Sep 17 00:00:00 2001 From: Josh Allmann Date: Thu, 19 Dec 2024 20:52:13 +0000 Subject: [PATCH] ai/live: Send event when auth fails --- media/rtmp2segment.go | 2 +- server/ai_live_video.go | 2 +- server/ai_mediaserver.go | 7 ++++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/media/rtmp2segment.go b/media/rtmp2segment.go index db1d5a8eb..c1040097b 100644 --- a/media/rtmp2segment.go +++ b/media/rtmp2segment.go @@ -59,7 +59,7 @@ func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmen Muxer: ffmpeg.ComponentOptions{Name: "segment"}, }}) if err != nil { - clog.Errorf(ctx, "Failed to run segmentation. in=%s err=%s", in, err) + clog.Errorf(ctx, "Failed to run segmentation. in=%s try=%d err=%s", in, retryCount, err) } retryCount++ time.Sleep(5 * time.Second) diff --git a/server/ai_live_video.go b/server/ai_live_video.go index edaebdd6e..5f6966af7 100644 --- a/server/ai_live_video.go +++ b/server/ai_live_video.go @@ -253,7 +253,7 @@ func startEventsSubscribe(ctx context.Context, url *url.URL, params aiRequestPar go func() { defer StreamStatusStore.Clear(streamId) for { - clog.Infof(ctx, "Reading from event subscription for URL: %s", url.String()) + clog.V(8).Infof(ctx, "Reading from event subscription for URL: %s", url.String()) segment, err := subscriber.Read() if err != nil { clog.Infof(ctx, "Error reading events subscription: %s", err) diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index 4099a8e1d..d30d948d5 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -430,7 +430,8 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler { // if auth webhook returns pipeline config these will be replaced pipeline := qp.Get("pipeline") rawParams := qp.Get("params") - var streamID, pipelineID string + streamID := streamName + var pipelineID string var pipelineParams map[string]interface{} if rawParams != "" { if err := json.Unmarshal([]byte(rawParams), &pipelineParams); err != nil { @@ -454,6 +455,10 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler { clog.Errorf(ctx, "failed to kick input connection: %s", kickErr.Error()) } clog.Errorf(ctx, "Live AI auth failed: %s", err.Error()) + monitor.SendQueueEventAsync("ai_stream_events", map[string]string{ + "type": "error", + "message": "Live AI auth failed: " + err.Error(), + }) http.Error(w, "Forbidden", http.StatusForbidden) return }