summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosep Torra <n770galaxy@gmail.com>2013-09-20 17:09:52 +0200
committerJosep Torra <n770galaxy@gmail.com>2013-09-20 17:09:52 +0200
commitec794a131e7f8c67b660c5952cd6d27f2b310d86 (patch)
tree5c16fabfb1810db5ab220938fa711c3e590339dc
parenta468fe053a2825a8a708d73f2eba9685de859eac (diff)
examples: fix a race condition when seeking
Fixes a race condition that caused pipeline deadlock during seeks.
-rw-r--r--examples/egl/testegl.c116
1 files changed, 75 insertions, 41 deletions
diff --git a/examples/egl/testegl.c b/examples/egl/testegl.c
index cb05b84..89a846b 100644
--- a/examples/egl/testegl.c
+++ b/examples/egl/testegl.c
@@ -149,11 +149,13 @@ typedef struct
/* GStreamer related resources */
GstElement *pipeline;
+ GstElement *vsink;
GstEGLDisplay *gst_display;
/* Interthread comunication */
GAsyncQueue *queue;
- GMutex *lock;
+ GMutex *queue_lock;
+ GMutex *flow_lock;
GCond *cond;
gboolean flushing;
GstMiniObject *popped_obj;
@@ -639,20 +641,16 @@ update_image (APP_STATE_T * state, GstBuffer * buffer)
{
GstEGLImageMemory *mem = (GstEGLImageMemory *) GST_BUFFER_DATA (buffer);
- g_mutex_lock (state->lock);
if (state->current_mem) {
gst_egl_image_memory_unref (state->current_mem);
}
state->current_mem = gst_egl_image_memory_ref (mem);
- g_mutex_unlock (state->lock);
TRACE_VC_MEMORY_ONCE_FOR_ID ("before glEGLImageTargetTexture2DOES", gid0);
glBindTexture (GL_TEXTURE_2D, state->tex);
glEGLImageTargetTexture2DOES (GL_TEXTURE_2D,
gst_egl_image_memory_get_image (mem));
TRACE_VC_MEMORY_ONCE_FOR_ID ("after glEGLImageTargetTexture2DOES", gid1);
-
- render_scene (state);
}
static void
@@ -660,7 +658,8 @@ init_intercom (APP_STATE_T * state)
{
state->queue =
g_async_queue_new_full ((GDestroyNotify) gst_mini_object_unref);
- state->lock = g_mutex_new ();
+ state->queue_lock = g_mutex_new ();
+ state->flow_lock = g_mutex_new ();
state->cond = g_cond_new ();
}
@@ -672,8 +671,12 @@ terminate_intercom (APP_STATE_T * state)
g_async_queue_unref (state->queue);
}
- if (state->lock) {
- g_mutex_free (state->lock);
+ if (state->queue_lock) {
+ g_mutex_free (state->queue_lock);
+ }
+
+ if (state->flow_lock) {
+ g_mutex_free (state->flow_lock);
}
if (state->cond) {
@@ -695,33 +698,47 @@ flush_start (APP_STATE_T * state)
{
GstMiniObject *object = NULL;
- g_mutex_lock (state->lock);
+ g_mutex_lock (state->queue_lock);
state->flushing = TRUE;
g_cond_broadcast (state->cond);
- g_mutex_unlock (state->lock);
+ g_mutex_unlock (state->queue_lock);
+ g_mutex_lock (state->flow_lock);
while ((object = GST_MINI_OBJECT_CAST (g_async_queue_try_pop (state->queue)))) {
gst_mini_object_unref (object);
}
-
+ g_mutex_lock (state->queue_lock);
flush_internal (state);
+ state->popped_obj = NULL;
+ g_mutex_unlock (state->queue_lock);
+ g_mutex_unlock (state->flow_lock);
}
static void
flush_stop (APP_STATE_T * state)
{
- g_mutex_lock (state->lock);
+ GstMiniObject *object = NULL;
+
+ g_mutex_lock (state->queue_lock);
+ while ((object = GST_MINI_OBJECT_CAST (g_async_queue_try_pop (state->queue)))) {
+ gst_mini_object_unref (object);
+ }
+ flush_internal (state);
state->popped_obj = NULL;
state->flushing = FALSE;
- g_mutex_unlock (state->lock);
+ g_mutex_unlock (state->queue_lock);
}
static void
pipeline_pause (APP_STATE_T * state)
{
- flush_start (state);
gst_element_set_state (state->pipeline, GST_STATE_PAUSED);
- flush_stop (state);
+}
+
+static void
+pipeline_play (APP_STATE_T * state)
+{
+ gst_element_set_state (state->pipeline, GST_STATE_PLAYING);
}
static gint64
@@ -731,7 +748,7 @@ pipeline_get_position (APP_STATE_T * state)
GstFormat format = GST_FORMAT_TIME;
if (state->pipeline) {
- gst_element_query_position (state->pipeline, &format, &position);
+ gst_element_query_position (state->vsink, &format, &position);
}
return position;
@@ -758,7 +775,7 @@ pipeline_seek (APP_STATE_T * state, gint64 position)
event = gst_event_new_seek (1.0,
GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT,
GST_SEEK_TYPE_SET, position, GST_SEEK_TYPE_SET, GST_CLOCK_TIME_NONE);
- if (!gst_element_send_event (state->pipeline, event)) {
+ if (!gst_element_send_event (state->vsink, event)) {
g_print ("seek failed\n");
}
}
@@ -768,25 +785,27 @@ static gboolean
handle_queued_objects (APP_STATE_T * state)
{
GstMiniObject *object = NULL;
+ gboolean done = FALSE;
if (g_async_queue_length (state->queue) == 0) {
return FALSE;
}
- while ((object = GST_MINI_OBJECT_CAST (g_async_queue_try_pop (state->queue)))) {
+ g_mutex_lock (state->queue_lock);
+ if (state->flushing) {
+ g_cond_broadcast (state->cond);
+ done = TRUE;
+ }
+ g_mutex_unlock (state->queue_lock);
- g_mutex_lock (state->lock);
- if (state->flushing) {
- state->popped_obj = object;
- gst_mini_object_unref (object);
- g_cond_broadcast (state->cond);
- g_mutex_unlock (state->lock);
- continue;
- }
- g_mutex_unlock (state->lock);
+ while (!done
+ && (object =
+ GST_MINI_OBJECT_CAST (g_async_queue_try_pop (state->queue)))) {
if (GST_IS_BUFFER (object)) {
GstBuffer *buffer = GST_BUFFER_CAST (object);
+
+ g_mutex_lock (state->queue_lock);
if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_PREROLL)) {
g_print ("got a reclaim buffer\n");
flush_internal (state);
@@ -794,6 +813,8 @@ handle_queued_objects (APP_STATE_T * state)
update_image (state, buffer);
}
gst_buffer_unref (buffer);
+ g_mutex_unlock (state->queue_lock);
+ render_scene (state);
} else if (GST_IS_MESSAGE (object)) {
GstMessage *message = GST_MESSAGE_CAST (object);
g_print ("\nmessage %p ", message);
@@ -820,7 +841,7 @@ handle_queued_objects (APP_STATE_T * state)
g_print ("\nevent %p %s\n", event,
gst_event_type_get_name (GST_EVENT_TYPE (event)));
- g_mutex_lock (state->lock);
+ g_mutex_lock (state->queue_lock);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
flush_internal (state);
@@ -828,14 +849,14 @@ handle_queued_objects (APP_STATE_T * state)
default:
break;
}
- g_mutex_unlock (state->lock);
+ g_mutex_unlock (state->queue_lock);
gst_event_unref (event);
}
- g_mutex_lock (state->lock);
+ g_mutex_lock (state->queue_lock);
state->popped_obj = object;
g_cond_broadcast (state->cond);
- g_mutex_unlock (state->lock);
+ g_mutex_unlock (state->queue_lock);
}
return FALSE;
@@ -844,24 +865,29 @@ handle_queued_objects (APP_STATE_T * state)
static gboolean
queue_object (APP_STATE_T * state, GstMiniObject * obj, gboolean synchronous)
{
- g_mutex_lock (state->lock);
+ gboolean res = TRUE;
+
+ g_mutex_lock (state->flow_lock);
if (state->flushing) {
- g_mutex_unlock (state->lock);
gst_mini_object_unref (obj);
- return FALSE;
+ res = FALSE;
+ goto beach;
}
g_async_queue_push (state->queue, obj);
if (synchronous) {
/* Waiting for object to be handled */
+ g_mutex_lock (state->queue_lock);
do {
- g_cond_wait (state->cond, state->lock);
+ g_cond_wait (state->cond, state->queue_lock);
} while (!state->flushing && state->popped_obj != obj);
+ g_mutex_unlock (state->queue_lock);
}
- g_mutex_unlock (state->lock);
- return TRUE;
+beach:
+ g_mutex_unlock (state->flow_lock);
+ return res;
}
static void
@@ -922,6 +948,7 @@ init_playbin_player (APP_STATE_T * state, const gchar * uri)
state->pipeline = gst_element_factory_make ("playbin2", "player");
g_object_set (state->pipeline, "uri", uri, "video-sink", vsink, NULL);
+ state->vsink = gst_object_ref (vsink);
return TRUE;
}
@@ -956,6 +983,7 @@ init_parse_launch_player (APP_STATE_T * state, const gchar * spipeline)
gst_pad_add_event_probe (gst_element_get_static_pad (vsink, "sink"),
G_CALLBACK (events_cb), state);
+ state->vsink = gst_object_ref (vsink);
return TRUE;
}
@@ -1049,7 +1077,7 @@ handle_keyboard (GIOChannel * source, GIOCondition cond, APP_STATE_T * state)
pipeline_pause (state);
break;
case 'r':
- gst_element_set_state (state->pipeline, GST_STATE_PLAYING);
+ pipeline_play (state);
break;
case 'l':
report_position_duration (state);
@@ -1061,7 +1089,8 @@ handle_keyboard (GIOChannel * source, GIOCondition cond, APP_STATE_T * state)
seek_backward (state);
break;
case 'q':
- gst_element_send_event (state->pipeline, gst_event_new_eos ());
+ flush_start (state);
+ gst_element_set_state (state->pipeline, GST_STATE_READY);
break;
}
}
@@ -1106,10 +1135,10 @@ buffering_cb (GstBus * bus, GstMessage * msg, APP_STATE_T * state)
gst_message_parse_buffering (msg, &percent);
g_print ("Buffering %3d%%\r", percent);
if (percent < 100)
- gst_element_set_state (state->pipeline, GST_STATE_PAUSED);
+ pipeline_pause (state);
else {
g_print ("\n");
- gst_element_set_state (state->pipeline, GST_STATE_PLAYING);
+ pipeline_play (state);
}
}
@@ -1335,6 +1364,11 @@ done:
/* Release pipeline */
if (state->pipeline) {
gst_element_set_state (state->pipeline, GST_STATE_NULL);
+ if (state->vsink) {
+ gst_object_unref (state->vsink);
+ state->vsink = NULL;
+ }
+
gst_object_unref (state->pipeline);
}