Appsink missing the "last" buffer

arunzg arungovindan92 at gmail.com
Fri Aug 30 05:03:32 UTC 2019


Hi,

I have an appsink connected to the hlsDemux using dynamic pad. It is working
but missing what seems to be the last buffer (about 0.5 seconds of TS
packets). In this case there are two different streams used and in both the
cases "last buffer" missing. Am I missing anything here?

#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <gst/gst.h>
#include <gst/app/gstappsink.h>

#define MAX_URL_LEN 150
#define MAX_FILENAME_LEN 100
/* Structure to contain all our information, so we can pass it to callbacks
*/
typedef struct _CustomData
{
    GstElement *pipeline;
    GstElement *httpsrc;
    GstElement *hlsDemux;
    GstElement *filesink;
    GstElement *appsink1;
    char url[MAX_URL_LEN];
    char outFile[MAX_FILENAME_LEN];
    int fHandle;
    GstPad *curr_pad;
} CustomData;


/* Handler for the pad-added signal */
static void pad_added_handler(GstElement *src, GstPad *pad, CustomData
*data);

/* called when the appsink notifies us that there is a new buffer ready for
 * processing */
static GstFlowReturn
on_new_sample_from_sink(GstElement * elt, CustomData * data)
{
    GstSample *sample;
    GstBuffer *app_buffer, *buffer;
    GstFlowReturn ret;
    GstMapInfo map;

    /* get the sample from appsink */
    sample = gst_app_sink_pull_sample(GST_APP_SINK(elt));
    buffer = gst_sample_get_buffer(sample);
    gst_buffer_map(buffer, &map, GST_MAP_READ);
    
    write(data->fHandle, map.data, map.size);

    /* we don't need the appsink sample anymore */
    gst_sample_unref(sample);

    return ret;
}

int main(int argc, char *argv[])
{
    CustomData data;
    GstBus *bus;
    GstMessage *msg;
    GstStateChangeReturn ret;
    gboolean terminate = FALSE;

    if (argc < 3)
    {
        g_printerr("Usage: %s url outfile\n", argv[0]);
        return -1;
    }

    strcpy(data.url, argv[1]);
    strcpy(data.outFile, argv[2]);
    // open output file. 
    data.fHandle = open(data.outFile, O_WRONLY | O_CREAT | O_TRUNC,
        S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);

    if (data.fHandle < 0)
    {
        g_printerr("Could not open file '%s'\n", argv[2]);
        return -1;;
    }

    /* Initialize GStreamer */
    gst_init(&argc, &argv);

    /* Create the elements */
    data.httpsrc = gst_element_factory_make("souphttpsrc", "source");
    data.hlsDemux = gst_element_factory_make("hlsdemux", "hlsdemux");
    data.appsink1 = gst_element_factory_make("appsink", "appsink1");

    /* Create the empty pipeline */
    data.pipeline = gst_pipeline_new("test-pipeline");

    if (!data.pipeline || !data.httpsrc || !data.hlsDemux || !data.appsink1
)
    {
        g_printerr("Not all elements could be created.\n");
        return -1;
    }

    /* Build the pipeline. Note that we are NOT linking the appsink at this
     * point. We will do it later. */
    gst_bin_add_many(GST_BIN(data.pipeline), data.httpsrc, data.hlsDemux,
data.appsink1,  NULL);
    if (!gst_element_link(data.httpsrc, data.hlsDemux))
    {
        g_printerr("element link failed\n");
        gst_object_unref(data.pipeline);
        return -1;
    }


    /* Modify the source's properties */

    /* Set the URI to play */
 //   g_object_set(data.httpsrc, "location",
"http://devimages.apple.com/iphone/samples/bipbop/gear4/prog_index.m3u8",
NULL);
    g_object_set(data.httpsrc, "location", data.url, NULL);
    /* Set properties of appsink*/
    g_object_set(data.appsink1, "emit-signals", TRUE, "sync", TRUE, NULL);
    g_signal_connect(data.appsink1, "new-sample",
        G_CALLBACK(on_new_sample_from_sink), &data);


    /* Connect to the pad-added signal */
    g_signal_connect(data.hlsDemux, "pad-added",
G_CALLBACK(pad_added_handler), &data);
    /* Start playing */
    ret = gst_element_set_state(data.pipeline, GST_STATE_PLAYING);
    if (ret == GST_STATE_CHANGE_FAILURE)
    {
        g_printerr("Unable to set the pipeline to the playing state.\n");
        gst_object_unref(data.pipeline);
        return -1;
    }

    /* Listen to the bus */
    bus = gst_element_get_bus(data.pipeline);
    do
    {
        msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE,
            GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_ERROR |
GST_MESSAGE_EOS);

        /* Parse message */
        if (msg != NULL)
        {
            GError *err;
            gchar *debug_info;

            switch (GST_MESSAGE_TYPE(msg))
            {
            case GST_MESSAGE_ERROR:
                gst_message_parse_error(msg, &err, &debug_info);
                g_printerr("Error received from element %s: %s\n",
GST_OBJECT_NAME(msg->src), err->message);
                g_printerr("Debugging information: %s\n", debug_info ?
debug_info : "none");
                g_clear_error(&err);
                g_free(debug_info);
                terminate = TRUE;
                break;
            case GST_MESSAGE_EOS:
                g_print("End-Of-Stream reached.\n");
                terminate = TRUE;
                break;
            case GST_MESSAGE_STATE_CHANGED:
                /* We are only interested in state-changed messages from the
pipeline */
                if (GST_MESSAGE_SRC(msg) == GST_OBJECT(data.pipeline))
                {
                    GstState old_state, new_state, pending_state;
                    gst_message_parse_state_changed(msg, &old_state,
&new_state, &pending_state);
                    g_print("Pipeline state changed from %s to %s:\n",
                        gst_element_state_get_name(old_state),
gst_element_state_get_name(new_state));
                }
                break;
            default:
                /* We should not reach here */
                g_printerr("Unexpected message received.\n");
                break;
            }
            gst_message_unref(msg);
        }
    } while (!terminate);

    /* Free resources */
    gst_object_unref(bus);
    gst_element_set_state(data.pipeline, GST_STATE_NULL);
    gst_object_unref(data.pipeline);
    close(data.fHandle);
    return 0;
}


/* This function will be called by the pad-added signal */
static void pad_added_handler(GstElement *src, GstPad *new_pad, CustomData
*data)
{
    GstPad *sink_pad = gst_element_get_static_pad(data->appsink1, "sink");
    GstPadLinkReturn ret;
    GstCaps *new_pad_caps = NULL;
    GstStructure *new_pad_struct = NULL;
    const gchar *new_pad_type = NULL;

    g_print("Received new pad '%s' from '%s' (%p:%p):\n",
GST_PAD_NAME(new_pad), GST_ELEMENT_NAME(src), new_pad, data->curr_pad);

    /* Check the new pad's type */
    new_pad_caps = gst_pad_get_current_caps(new_pad);
    new_pad_struct = gst_caps_get_structure(new_pad_caps, 0);
    new_pad_type = gst_structure_get_name(new_pad_struct);

    /* If our converter is already linked, we have nothing to do here */
    if (gst_pad_is_linked(sink_pad))
    {
        g_print("We are already linked sink1. type=%s\n", new_pad_type);

        /* unlink the appsink from last pad  */

        if (TRUE != gst_pad_unlink(data->curr_pad, sink_pad))
        {
            g_print("Type is '%s' unlink failed.\n", new_pad_type);
            goto exit;
        }
        else
        {
            g_print("UnLink succeeded (type '%s').\n", new_pad_type);
        }

        // New stream starting. missing the last buffer
        // and that causes corrupted TS stream. 
        // So close the current file and open another
        close(data->fHandle);
        // open output file. 
        int len = strlen(data->outFile);
        data->outFile[len] = '_';
        data->outFile[len + 1] = '1';
        data->outFile[len + 2] = '\0';
        g_print("Writing to new file %s\n", data->outFile);
        data->fHandle = open(data->outFile, O_WRONLY | O_CREAT | O_TRUNC,
            S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);

        if (data->fHandle < 0)
        {
            g_printerr("Could not open file '%s'\n", data->outFile);
            return -1;;
        }

    }



        
    if (!sink_pad)
    {
        g_print("couldnt get appsink pad\n");
        goto exit;

    }

    /* Attempt the link */
    ret = gst_pad_link(new_pad, sink_pad);
    if (GST_PAD_LINK_FAILED(ret))
    {
        g_print("Type is '%s' link failed(%d).\n", new_pad_type, ret);
    }
    else
    {
        g_print("Link succeeded (type '%s').\n", new_pad_type);
        data->curr_pad = new_pad;
    }

exit:
    /* Unreference the new pad's caps, if we got them */
    if (new_pad_caps != NULL)
        gst_caps_unref(new_pad_caps);

    /* Unreference the sink pad */
    gst_object_unref(sink_pad);
}



--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/


More information about the gstreamer-devel mailing list