Images into video streams

Matt Clark mclark4386 at gmail.com
Tue Mar 8 01:31:43 UTC 2022


*Context:* I'm making a server that composites various sources of media
into a video stream(like a 2x2 grid from different sources for instance)
and supports multiple streams at the same time (including being able to
update those pipelines dynamically with web calls in the future). One
source is an image(image/png) that gets generated upon request by a
separate microservice. I was trying to use an Appsrc to pull the image and
then push it into a PngDec --> ImageFreeze and then link that to the
composite element with the sink properties for which part of the grid it
should show on.
I've got it pulling the image and pushing that buffer, but I'm getting the
error below, and I'm not sure exactly what's causing it.

*Questions:*
1. What do you think is happening and what's the best way to resolve it?
2. Is there a better way for me to do this?
3. Knowing that I plan to have multiple streams pulling from multiple
sources and want to update them live, any pointers?

*Error:*

should have image

should have image in pipeline: Ok

Error from
> Some("/GstPipeline:pipeline_6be3b6be-0767-49db-a2b1-26708950261b/GstAppSrc:vsource0_top_left"):
> Internal data stream error. (Some("../libs/gst/base/gstbasesrc.c(3127):
> gst_base_src_loop ():
> /GstPipeline:pipeline_6be3b6be-0767-49db-a2b1-26708950261b/GstAppSrc:vsource0_top_left:\nstreaming
> stopped, reason not-negotiated (-4)"))
>

*Code:*
let vsource = gst_create_element("appsrc", name)?;

let mut location = "".to_string();
let mut body = "".to_string();

for (key, value) in src_properties.iter() {
if key == "url" {
location = value.to_string();
} else if key == "payload" {
body = String::from(value);
}
}

let pngdec = gst_create_element("pngdec",format!("pngdec_{}_{}",name,index).
as_str())?;
let imagefreeze = gst_create_element("imagefreeze",format!("imagefreeze_{}_{
}",name,index).as_str())?;

println!("adding source");
let local_pipeline = pipeline.lock().unwrap();
local_pipeline.add_many(&[&vsource, &pngdec, &imagefreeze])?;
drop(local_pipeline);

gst::Element::link_many(&[&vsource, &pngdec, &imagefreeze])?;

gst_add_element_to_comp(
pipeline,
&imagefreeze,
name,
None,
// src_caps,
sink_properties,
comp,
index,
)?;

let appsrc = vsource
.dynamic_cast::<gst_app::AppSrc>()
.expect("Source element is expected to be an appsrc!");

appsrc.set_caps(src_caps);
appsrc.set_format(gst::Format::Bytes);

appsrc.set_callbacks(
gst_app::AppSrcCallbacks::builder()
.need_data(move |appsrc, _| {
if location == "" {
let _ = appsrc.end_of_stream();
return;
}

println!("Producing frame");
let client = reqwest::blocking::Client::new();
let our_body = body.clone();
let resp_result = client.post(location.as_str()).body(our_body).send();
match resp_result {
Ok(resp) => {
if resp.status().is_success() {
println!("should have image");
match resp.bytes() {
Ok(bytes) => {
let buf = gst::Buffer::from_slice(bytes);
match appsrc.push_buffer(buf) {
Ok(suc) => {
println!("should have image in pipeline: {:?}", suc)
}
Err(err) => println!(
"error putting image in pipeline: {:?}",
err
),
};
}
Err(err) => {
appsrc.post_error_message(ErrorMessage::new(
&gst::CoreError::Failed,
Some(
format!("no bytes from image request:{:?}", err)
.as_str(),
),
Some(
format!("no bytes from image request:{:?}", err)
.as_str(),
),
"filename",
"function",
100,
));
return;
}
}
} else {
println!("error getting image: {:?}", resp.status());
}
}
Err(err) => {
appsrc.post_error_message(ErrorMessage::new(
&gst::CoreError::Failed,
Some(format!("no response from image request:{:?}", err).as_str()),
Some(format!("no response from image request:{:?}", err).as_str()),
"filename",
"function",
116,
));
return;
}
}
})
.build(),
);
// Assumes that the element is already in the pipeline
pub fn gst_add_element_to_comp(
pipeline: &Arc<Mutex<gst::Pipeline>>,
vsource: &gst::Element,
name: &str,
src_caps: Option<&gst::Caps>,
sink_properties: HashMap<String, String>,
comp: &gst::Element,
index: i32,
) -> Result<()> {
let vconvert = gst_create_element("videoconvert", &format!("vconvert_{}",
name))?;
let vqueue = gst_create_element("queue", &format!("v{}_queue_for_{}", index,
name))?;
let sink = comp.request_pad_simple(&format!("sink_{}", index)).unwrap();
for (key, value) in sink_properties.iter() {
sink.set_property_from_str(key, value);
}

vqueue.set_property_from_str("leaky", "1");

let local_pipeline = pipeline.lock().unwrap();
local_pipeline.add_many(&[&vconvert, &vqueue])?;
drop(local_pipeline);

if let Some(caps) = src_caps {
println!("link with caps: {:?}", caps);
vsource.link_filtered(&vconvert, &caps)?;
} else {
println!("link");
vsource.link(&vconvert)?;
}

match gst::Element::link_many(&[&vconvert, &vqueue, &comp]) {
Ok(_) => (),
Err(err) => {
println!("Got error:{:?}", err);
return Err(eyre!(err));
}
};
Ok(())
}
The Pipeline and composite elements are created in the logic before these
and I have test patterns working great, it's just when I start getting
fancier that this issue pops up.

Thanks for any input! I've been coding for a long time, but fairly new to
rust and gstreamer so appologies if this is some stupid issue. I'd love any
pointers/wisdom you have to share!
-Matt
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/gstreamer-devel/attachments/20220307/2025c516/attachment-0001.htm>


More information about the gstreamer-devel mailing list