'Gstreamer, how recover from (rtmpsink) error

I'm building streaming application in python with gstreamer.

The application writes data to rtmpsink and to filesink using a tee element. Starting and streaming works fine in ideal environment (local network) but what to do if there is a disconnect with the streaming server for example? I'm trying to figure out how to keep the pipeline running and thus keep writing to filesink after error occurs...

What I'm trying to archive:

  1. At least I would like to keep my archive file (filesink) after error in streaming part (rtmpsink) occurs. Hereby we have some backup if error happens.
  2. Manually reconnect to streaming server.
  3. Build some mechanism to check for connection and reconnect streaming part (rtmpsink) if possible.

Question(s):

Is it possible to archive what I'm trying to do?

How can it be archived (dynamic pipeline / probes / extra elements )?

Any explanation, example or point to right direction will be very much appreciated.

note:

Gst version: gstreamer 1.3.90 (rtmpsink, faac, x264enc)
OS: ubuntu 14.04 LTS
Streaming server: wowza 4.x

Test application (code): link
Pipeline after startup(OK): link

Pipeline after rtmpsink error(Failed to write data): link
Log snippet after rtmpsink error(Failed to write data): link



Solution 1:[1]

I'm not sure how reliable a system you'll get with a single pipeline on this. What I'd recommend doing is creating a two stage process:

1) audio -> encode -> tee -> filesink
                          -> shmsink 

2) shmsrc -> mux -> rtmpsink

Then create a wrapper script for the second pipeline. Here's a sample of how to use these elements with a videotestsrc. Note that the caps are really important--they have to be sufficiently detailed to know what's coming in over the shared memory.

gst-launch-1.0 videotestsrc ! tee name=t ! queue ! videoconvert ! ximagesink t. ! video/x-raw,width=400,height=400,format=BGRA ! shmsink wait-for-connection=false socket-path=/tmp/shr

gst-launch-1.0 shmsrc socket-path=/tmp/shr ! video/x-raw,width=400,height=400,format=BGRA,framerate=30/1 ! videoconvert ! ximagesink

You could also try this approach with TCP/UDP instead of shared memory. I don't have faac plugin installed but the pipeline would probably be something like this:

audio -> faac -> rtpmp4apay -> udpsink host=localhost port=1919

udpsrc port=1919 -> rtpmp4adepay -> mux -> rtmpsink

Solution 2:[2]

I have also been working on trying to get a pipeline to reconnect to an RTMP server after errors. In principle I agree with @mpr's answer (using two pipelines connected with an shmsink/shmsrc pair) but I was unable to get that to work reliably, so I ended up using a different strategy.

I am using rtmp2sink and when that encounters an error it will post a message on the pipeline bus, and then return GST_FLOW_FLUSHING which causes the pipeline to flush everything. This is not what I was interested in, so I added a GhostPad in front of the rtmp2sink which catches the return value and turns it back into GST_FLOW_OK. At that point I also reset the rtmp2sink element to make it reconnect.

This seems pretty reliable, and at least against the RTMP server I was using I did not need to do anything special for handling keyframes from encoders.

All of this was tested with Gstreamer version 1.18.5. Here is a very basic example in Python showing this approach:

#!/usr/bin/env python3
import gi

gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib

def _handle_message(_, message, loop):
    """handle messages posted to pipeline bus"""
    if message.type == Gst.MessageType.EOS:
        print("End-of-stream")
        loop.quit()
    elif message.type == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        if message.src.__class__.__name__ == "GstRtmp2Sink" and err.matches(
            Gst.ResourceError.quark(), Gst.ResourceError(err.code)
        ):
            resource_error = Gst.ResourceError(err.code)
            print(f"caught {resource_error} from rtmp2sink, ignoring")
        else:
            print(f"caught error {err} ({debug}), exiting")
            loop.quit()
    return True

def _wrap_rtmp_sink(rtmpsink: Gst.Element):
    """wrap RTMP sink to make it handle reconnections"""

    def _chain(pad: Gst.Pad, _, buffer: Gst.Buffer):
        internal_pad = pad.get_internal()
        result = internal_pad.push(buffer)
        if result == Gst.FlowReturn.FLUSHING or result == Gst.FlowReturn.ERROR:
            print(f"Restarting RTMP sink after {result}")
            rtmpsink.set_state(Gst.State.NULL)
            rtmpsink.set_state(Gst.State.PLAYING)
            return Gst.FlowReturn.OK

        return result

    sinkpad = rtmpsink.get_static_pad("sink")
    peer = sinkpad.get_peer()
    peer.unlink(sinkpad)

    ghost_pad = Gst.GhostPad.new("proxypad", sinkpad)
    ghost_pad.set_chain_function_full(_chain)
    peer.link(ghost_pad)
    ghost_pad.activate_mode(Gst.PadMode.PUSH, True)

    # hang on to GhostPad to avoid Python garbage collecting it
    rtmpsink._ghost_pad = ghost_pad

def main():
    Gst.init(None)
    pipeline = Gst.parse_launch(
        f"""
        videotestsrc
            ! video/x-raw,width=1280,height=720,framerate=30/1
            ! avenc_h264_videotoolbox
            ! h264parse
            ! flvmux.video

        audiotestsrc
            ! faac
            ! flvmux.audio

        flvmux name=flvmux streamable=true
            ! queue
            ! rtmp2sink name=rtmp location=rtmp://10.1.0.10/test/test
    """
    )

    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", _handle_message, loop)

    _wrap_rtmp_sink(pipeline.get_by_name("rtmp"))

    pipeline.set_state(Gst.State.PLAYING)
    loop.run()
    pipeline.set_state(Gst.State.NULL)

if __name__ == "__main__":
    main()

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Community
Solution 2 AHM