error: Copy Code Got EOS from element "pipeline0"
gst-launch-1.0.exe rtspsrc location=rtsp://username:password@ip/rtsp_tunnel?vcd=2 ! application/x-rtp, media=application ! fakesink dump=true
from threading import Thread from time import sleep import gi import sys import xmltodict def xml_parser(data): try: print('ok') doc = xmltodict.parse(data) doc = doc['tt:MetadataStream']['tt:VideoAnalytics']['tt:Frame'] _utc_time = doc['@UtcTime'] _object_id = doc['tt:Object']['@ObjectId'] doc = doc['tt:Object']['tt:Appearance']['tt:Shape'] _bottom = doc['tt:BoundingBox']['@bottom'] _top = doc['tt:BoundingBox']['@top'] _right = doc['tt:BoundingBox']['@right'] _left = doc['tt:BoundingBox']['@left'] _center_of_gravity_x = doc['tt:CenterOfGravity']['@x'] _center_of_gravity_y = doc['tt:CenterOfGravity']['@y'] doc = doc['tt:Polygon']['tt:Point'] points = list() points.append(_utc_time) points.append(_object_id) points.append(_bottom) points.append(_top) points.append(_right) points.append(_left) points.append(_center_of_gravity_x) points.append(_center_of_gravity_y) for point in doc: x = point['@x'] y = point['@y'] # points.append((x, y)) points.append(x) points.append(y) with open('points.txt', 'a+') as f: f.write(', '.join(points)) f.write('\r\n') print(points) except: pass def on_new_buffer(p1): sample = p1.try_pull_sample(Gst.SECOND) if sample is None: return buffer = sample.get_buffer() buffer=buffer.extract_dup(0, buffer.get_size()) print(buffer) uri = 'rtsp://username:password@ip/rtsp_tunnel?vcd=2' gi.require_version("Gst", "1.0") gi.require_version("GstApp", "1.0") from gi.repository import Gst, GstApp, GLib Gst.init() main_loop = GLib.MainLoop() main_loop_thread = Thread(target=main_loop.run) main_loop_thread.start() pipeline = Gst.parse_launch("rtspsrc location=rtsp://service:tam!dhc0@172.16.130.110/rtsp_tunnel?vcd=2 ! application/x-rtp, media=application ! appsink emit-signals=True name=sink") pipeline.set_delay(0) pipeline.set_latency(0) sink = pipeline.get_by_name('sink') #sink.connect('new_sample', on_new_buffer) #sink.set_property('emit-signals', True) pipeline.set_state(Gst.State.PLAYING) data = list() try: while True: sample = sink.try_pull_sample(Gst.SECOND) print(sample) if sample is None: continue print('okkk') buffer = sample.get_buffer() buffer = buffer.extract_dup(0, buffer.get_size()) buffer = buffer[12:].decode() data.append(buffer) if buffer.endswith('</tt:MetadataStream>'): data = ''.join(data) xml_parser(data) data = list() except Exception as e: print(e) main_loop.quit() main_loop_thread.join() print('end')
var
This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)