Skip to content

Commit

Permalink
Convencience API for iterating frames
Browse files Browse the repository at this point in the history
  • Loading branch information
keepingitneil committed Aug 20, 2024
1 parent 1689545 commit 3a93b61
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 6 deletions.
4 changes: 4 additions & 0 deletions libwebrtc/src/video_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ pub mod native {
pub fn close(&mut self) {
self.handle.close();
}

pub fn set_native_video_stream(&mut self, native_video_stream: stream_imp::NativeVideoStream) {
self.handle = native_video_stream;
}
}

impl Stream for NativeVideoStream {
Expand Down
9 changes: 9 additions & 0 deletions livekit-ffi/protocol/audio_frame.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package livekit.proto;
option csharp_namespace = "LiveKit.Proto";

import "handle.proto";
import "track.proto";

// Create a new AudioStream
// AudioStream is used to receive audio frames from a track
Expand All @@ -27,6 +28,14 @@ message NewAudioStreamRequest {
}
message NewAudioStreamResponse { OwnedAudioStream stream = 1; }

message AudioStreamFromParticipantRequest {
uint64 participant_handle = 1;
AudioStreamType type = 2;
optional TrackSource track_source = 3;
}

message AudioStreamFromParticipantResponse { OwnedAudioStream stream = 1; }

// Create a new AudioSource
message NewAudioSourceRequest {
AudioSourceType type = 1;
Expand Down
2 changes: 2 additions & 0 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ message FfiRequest {
NewVideoSourceRequest new_video_source = 19;
CaptureVideoFrameRequest capture_video_frame = 20;
VideoConvertRequest video_convert = 21;
VideoStreamFromParticipantRequest video_stream_from_participant = 22;

// Audio
NewAudioStreamRequest new_audio_stream = 23;
Expand All @@ -88,6 +89,7 @@ message FfiRequest {
NewAudioResamplerRequest new_audio_resampler = 26;
RemixAndResampleRequest remix_and_resample = 27;
E2eeRequest e2ee = 28;
AudioStreamFromParticipantRequest audio_stream_from_participant = 29;
}
}

Expand Down
12 changes: 12 additions & 0 deletions livekit-ffi/protocol/video_frame.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package livekit.proto;
option csharp_namespace = "LiveKit.Proto";

import "handle.proto";
import "track.proto";

// Create a new VideoStream
// VideoStream is used to receive video frames from a track
Expand All @@ -30,6 +31,17 @@ message NewVideoStreamRequest {
}
message NewVideoStreamResponse { OwnedVideoStream stream = 1; }

// Request a video stream from a participant
message VideoStreamFromParticipantRequest {
uint64 participant_handle = 1;
VideoStreamType type = 2;
TrackSource track_source = 3;
optional VideoBufferType format = 4;
bool normalize_stride = 5;
}

message VideoStreamFromParticipantResponse { OwnedVideoStream stream = 1;}

// Create a new VideoSource
// VideoSource is used to send video frame to a track
message NewVideoSourceRequest {
Expand Down
44 changes: 43 additions & 1 deletion livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// @generated
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FrameCryptor {
Expand Down Expand Up @@ -1542,6 +1543,27 @@ pub struct NewVideoStreamResponse {
#[prost(message, optional, tag="1")]
pub stream: ::core::option::Option<OwnedVideoStream>,
}
/// Request a video stream from a participant
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct VideoStreamFromParticipantRequest {
#[prost(uint64, tag="1")]
pub participant_handle: u64,
#[prost(enumeration="VideoStreamType", tag="2")]
pub r#type: i32,
#[prost(enumeration="TrackSource", tag="3")]
pub track_source: i32,
#[prost(enumeration="VideoBufferType", optional, tag="4")]
pub format: ::core::option::Option<i32>,
#[prost(bool, tag="5")]
pub normalize_stride: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct VideoStreamFromParticipantResponse {
#[prost(message, optional, tag="1")]
pub stream: ::core::option::Option<OwnedVideoStream>,
}
/// Create a new VideoSource
/// VideoSource is used to send video frame to a track
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -2847,6 +2869,22 @@ pub struct NewAudioStreamResponse {
#[prost(message, optional, tag="1")]
pub stream: ::core::option::Option<OwnedAudioStream>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct AudioStreamFromParticipantRequest {
#[prost(uint64, tag="1")]
pub participant_handle: u64,
#[prost(enumeration="AudioStreamType", tag="2")]
pub r#type: i32,
#[prost(enumeration="TrackSource", optional, tag="3")]
pub track_source: ::core::option::Option<i32>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct AudioStreamFromParticipantResponse {
#[prost(message, optional, tag="1")]
pub stream: ::core::option::Option<OwnedAudioStream>,
}
/// Create a new AudioSource
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -3118,7 +3156,7 @@ impl AudioSourceType {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiRequest {
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 23, 24, 25, 26, 27, 28")]
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29")]
pub message: ::core::option::Option<ffi_request::Message>,
}
/// Nested message and enum types in `FfiRequest`.
Expand Down Expand Up @@ -3169,6 +3207,8 @@ pub mod ffi_request {
CaptureVideoFrame(super::CaptureVideoFrameRequest),
#[prost(message, tag="21")]
VideoConvert(super::VideoConvertRequest),
#[prost(message, tag="22")]
VideoStreamFromParticipant(super::VideoStreamFromParticipantRequest),
/// Audio
#[prost(message, tag="23")]
NewAudioStream(super::NewAudioStreamRequest),
Expand All @@ -3182,6 +3222,8 @@ pub mod ffi_request {
RemixAndResample(super::RemixAndResampleRequest),
#[prost(message, tag="28")]
E2ee(super::E2eeRequest),
#[prost(message, tag="29")]
AudioStreamFromParticipant(super::AudioStreamFromParticipantRequest),
}
}
/// This is the output of livekit_ffi_request function.
Expand Down
1 change: 1 addition & 0 deletions livekit-ffi/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub mod colorcvt;
pub mod logger;
pub mod requests;
pub mod room;
mod utils;
pub mod video_source;
pub mod video_stream;

Expand Down
51 changes: 51 additions & 0 deletions livekit-ffi/src/server/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use livekit::prelude::{RoomEvent, Track, TrackSource};
use tokio::sync::mpsc;

use super::room::FfiParticipant;

pub async fn track_changed_trigger(
participant: FfiParticipant,
track_source: TrackSource,
track_tx: mpsc::Sender<Track>,
) {
for track_pub in participant.participant.track_publications().values() {
if track_pub.source() == track_source.into() {
let track = track_pub.track();
match track {
Some(track) => {
track_tx.send(track).await;
}
_ => {}
}
}
}
let room = &participant.room.room;
let mut room_event_rx = room.subscribe();
while let Some(event) = room_event_rx.recv().await {
match event {
RoomEvent::TrackPublished { publication, participant: p } => {
if participant.participant.identity() != p.identity() {
continue;
}
if publication.source() == track_source.into() {
let track = publication.track();
match track {
Some(track) => {
track_tx.send(track.into()).await;
}
_ => {}
}
}
}
RoomEvent::ParticipantDisconnected(participant) => {
if participant.identity() == participant.identity() {
break;
}
}
RoomEvent::Disconnected { reason: _ } => {
break;
}
_ => {}
}
}
}
91 changes: 87 additions & 4 deletions livekit-ffi/src/server/video_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,18 @@
// limitations under the License.

use futures_util::StreamExt;
use livekit::webrtc::{prelude::*, video_stream::native::NativeVideoStream};
use tokio::sync::oneshot;

use super::{colorcvt, room::FfiTrack, FfiHandle};
use livekit::{
prelude::Track,
webrtc::{prelude::*, video_stream::native::NativeVideoStream},
};
use tokio::sync::{mpsc, oneshot};

use super::{
colorcvt,
room::{FfiParticipant, FfiTrack},
FfiHandle,
};
use crate::server::utils;
use crate::{proto, server, FfiError, FfiHandleId, FfiResult};

pub struct FfiVideoStream {
Expand Down Expand Up @@ -141,4 +149,79 @@ impl FfiVideoStream {
log::warn!("failed to send video EOS: {}", err);
}
}

async fn participant_video_stream_task(
server: &'static server::FfiServer,
request: proto::VideoStreamFromParticipantRequest,
stream_handle: FfiHandleId,
participant_handle: FfiHandleId,
mut close_rx: oneshot::Receiver<()>,
dst_type: Option<proto::VideoBufferType>,
normalize_stride: bool,
) {
let p = server.retrieve_handle::<FfiParticipant>(participant_handle);
let p = match p {
Ok(p) => p,
Err(err) => {
log::error!("failed to retrieve participant: {}", err);
return;
}
};
let track_source = request.track_source();
let (track_tx, mut track_rx) = mpsc::channel::<Track>(1);
server.async_runtime.spawn(utils::track_changed_trigger(
p.clone(),
track_source.into(),
track_tx,
));
// track_tx is no longer held, so the track_rx will be closed

loop {
let track = track_rx.recv().await;
if let Some(track) = track {
let rtc_track = track.rtc_track();
let MediaStreamTrack::Video(rtc_track) = rtc_track else {
continue;
};

Self::native_video_stream_task(
server,
stream_handle,
dst_type,
normalize_stride,
NativeVideoStream::new(rtc_track),
close_rx,
)
.await;
} else {
break;
}
}
}

pub fn from_participant(
server: &'static server::FfiServer,
request: proto::VideoStreamFromParticipantRequest,
) -> FfiResult<proto::OwnedVideoStream> {
let ffi_participant =
server.retrieve_handle::<FfiParticipant>(request.participant_handle)?.clone();
let (close_tx, close_rx) = oneshot::channel();
let stream_type = request.r#type();
let handle_id = server.next_id();
let stream = match stream_type {
#[cfg(not(target_arch = "wasm32"))]
proto::VideoStreamType::VideoStreamNative => {
let video_stream = Self { handle_id, close_tx, stream_type };
Ok::<FfiVideoStream, FfiError>(video_stream)
}
_ => return Err(FfiError::InvalidRequest("unsupported video stream type".into())),
}?;
let info = proto::VideoStreamInfo::from(&stream);
server.store_handle(stream.handle_id, stream);

Ok(proto::OwnedVideoStream {
handle: Some(proto::FfiOwnedHandle { id: handle_id }),
info: Some(info),
})
}
}
2 changes: 1 addition & 1 deletion livekit-protocol/protocol
Submodule protocol updated 49 files
+5 −0 .changeset/brown-rats-visit.md
+5 −0 .changeset/fuzzy-crews-refuse.md
+5 −0 .changeset/healthy-papayas-join.md
+12 −0 .changeset/mighty-steaks-invite.md
+5 −0 .changeset/proud-gorillas-press.md
+5 −0 .changeset/tough-dingos-love.md
+0 −37 CHANGELOG.md
+1 −1 auth/grants.go
+20 −19 go.mod
+38 −38 go.sum
+41 −28 infra/link_grpc.pb.go
+866 −318 livekit/livekit_agent.pb.go
+0 −674 livekit/livekit_agent_dispatch.pb.go
+10 −7 livekit/livekit_agent_grpc.pb.go
+71 −74 livekit/livekit_egress.pb.go
+187 −187 livekit/livekit_egress.twirp.go
+250 −261 livekit/livekit_internal.pb.go
+51 −65 livekit/livekit_models.pb.go
+223 −224 livekit/livekit_room.pb.go
+81 −81 livekit/livekit_room.twirp.go
+356 −365 livekit/livekit_rtc.pb.go
+0 −40 livekit/types.go
+0 −100 livekit/types_test.go
+3 −1 magefile.go
+1 −1 package.json
+0 −18 packages/javascript/CHANGELOG.md
+2 −2 packages/javascript/package.json
+51 −6 protobufs/livekit_agent.proto
+0 −69 protobufs/livekit_agent_dispatch.proto
+0 −1 protobufs/livekit_egress.proto
+1 −2 protobufs/livekit_internal.proto
+0 −11 protobufs/livekit_models.proto
+1 −1 protobufs/livekit_room.proto
+0 −1 protobufs/livekit_rtc.proto
+1 −5 protobufs/rpc/agent.proto
+11 −11 replay/cloud_replay.pb.go
+25 −91 rpc/agent.pb.go
+29 −31 rpc/agent.psrpc.go
+160 −48 rpc/analytics_grpc.pb.go
+16 −16 rpc/participant.psrpc.go
+12 −12 rpc/room.psrpc.go
+2 −13 utils/configobserver.go
+0 −10 utils/configobserver_test.go
+0 −100 utils/configutil/atomic.go
+0 −74 utils/deepcopy.go
+0 −46 utils/deepcopy_test.go
+39 −97 utils/event_emitter.go
+0 −24 utils/event_emitter_test.go
+4 −77 webhook/url_notifier.go

0 comments on commit 3a93b61

Please sign in to comment.