Hi,

I'm still trying to fix my audio/video problem. First, for resume, the
goal's program is to create a video-conferencing between two contacts
using Farsight and Telepathy.
This program works sometimes (I get two "video pops" and the connection
was stable), but often the program fails. 

I'm wondering if there is a problem with my installation or with my
program itself. It would be nice if someone on the list could give a try
to the program. The script is attached and to test it just run two
clients, one with "slave" parameter and seconds with "master" parameter
(master account initiate the call or slave accept the call).

Prior to testing, fill in your accounts informations in line 70-73:
MASTER_ACCOUNT = "[email protected]"
MASTER_PASSWORD = "aaa"
SLAVE_ACCOUNT = "[email protected]"
SLAVE_PASSWORD = "bbb"

Test procedure is simple: launch the program a couple of times. Here it
works maybe one time out of five. Just tell me if it works on your
machine more or less often than that. 

Thank you very much,
-- 
Fabien LOUIS <[email protected]>
"""
Example to create a videoconferencing application using Farsight throught
Telepathy
Works sometimes with telepathy-gabble-0.7.26

Just run two clients, one with "slave" parameter and seconds with "master"
parameter (master account initiate the call or slave accept the call).
"""

import sys
import os
os.environ["GST_PLUGIN_PATH"]="/usr/local/lib/gstreamer-0.10"

#os.environ["GST_DEBUG"] = "fsrtp*:5"
os.environ["GST_DEBUG"] = "2,fs*:5"
#os.environ["GST_DEBUG"] = "*PAD*:5,*bin*:5"

import pygst
pygst.require('0.10')

import gobject
import dbus.glib

import tpfarsight
import telepathy
import farsight, gst
from telepathy.interfaces import (
    CONNECTION,
    CONNECTION_MANAGER,
    CONNECTION_INTERFACE_CAPABILITIES,
    CHANNEL_TYPE_STREAMED_MEDIA,
    CHANNEL_TYPE_TEXT,
    CONNECTION_INTERFACE_REQUESTS,
    CHANNEL_INTERFACE_GROUP,
)
from telepathy.constants import (
    CONNECTION_STATUS_CONNECTED,
    HANDLE_TYPE_CONTACT,
    MEDIA_STREAM_TYPE_VIDEO,
    CHANNEL_TEXT_MESSAGE_TYPE_NORMAL,
    MEDIA_STREAM_PENDING_LOCAL_SEND,
    MEDIA_STREAM_PENDING_REMOTE_SEND,
    MEDIA_STREAM_TYPE_AUDIO,
    MEDIA_STREAM_TYPE_VIDEO,
    MEDIA_STREAM_STATE_DISCONNECTED,
    MEDIA_STREAM_STATE_CONNECTED,
    MEDIA_STREAM_STATE_CONNECTING,
    MEDIA_STREAM_DIRECTION_NONE,
    MEDIA_STREAM_DIRECTION_SEND,
    MEDIA_STREAM_DIRECTION_RECEIVE,
    MEDIA_STREAM_DIRECTION_BIDIRECTIONAL,
)

media_stream_type = {MEDIA_STREAM_TYPE_AUDIO : "Audio",\
                     MEDIA_STREAM_TYPE_VIDEO : "Video"}

media_stream_state = {MEDIA_STREAM_STATE_DISCONNECTED : "Disconnected",\
                      MEDIA_STREAM_STATE_CONNECTED : "Connected",
                      MEDIA_STREAM_STATE_CONNECTING : "Connecting"}

media_stream_direction = {MEDIA_STREAM_DIRECTION_NONE : "None",\
                          MEDIA_STREAM_DIRECTION_SEND : "Send",
                          MEDIA_STREAM_DIRECTION_RECEIVE : "Receive",
                          MEDIA_STREAM_DIRECTION_BIDIRECTIONAL : "Bidirectional"}

media_stream_pending_local = {MEDIA_STREAM_PENDING_LOCAL_SEND : "Local send",\
                              MEDIA_STREAM_PENDING_REMOTE_SEND : "Remote send"}

##################################################
MASTER_ACCOUNT = ""
MASTER_PASSWORD = ""
SLAVE_ACCOUNT = ""
SLAVE_PASSWORD = ""
##################################################
def debug_callback(self, *args, **kwargs):
    print "debug_callback"
    # Print all kwarg
    for arg in args:
        print "\t[arg] %s" % str(arg)       
    # Print all kwarg
    for kwarg in kwargs:
        print "\t[kwarg]%s: %s" % (kwarg, kwargs[kwarg])
        
def on_status_changed_user(state, reason):    
    if state == CONNECTION_STATUS_CONNECTED:
        print "[user] Connected"
        gobject.timeout_add(2000, enable_capabilities)
        if mode == "master":
            print "Creating channel"
            gobject.timeout_add(5000, create_channel)
        else:
            print "Waiting streams..."

def enable_capabilities():
    conn[CONNECTION_INTERFACE_CAPABILITIES].AdvertiseCapabilities(
                            [(CHANNEL_TYPE_STREAMED_MEDIA, 15)], [])

def create_channel():
    # Request contact handle
    contact_handle = conn[CONNECTION].RequestHandles(HANDLE_TYPE_CONTACT,
                                                     [SLAVE_ACCOUNT])[0]    
    # Create CHANNEL_TYPE_STREAMED_MEDIA with handle
    channel_infos = conn[CONNECTION_INTERFACE_REQUESTS].CreateChannel({
                    "org.freedesktop.Telepathy.Channel.ChannelType":
                        CHANNEL_TYPE_STREAMED_MEDIA,
                    "org.freedesktop.Telepathy.Channel.TargetHandleType":
                        HANDLE_TYPE_CONTACT,
                    "org.freedesktop.Telepathy.Channel.TargetHandle":
                        contact_handle})
            
def on_new_channel_user(object_path, channel_type, handle_type,
                         handle, suppress_handler):        
    if channel_type == CHANNEL_TYPE_STREAMED_MEDIA:
        # When CHANNEL_TYPE_STREAMED_MEDIA is ready
        print "[user] New %s channel" % (channel_type)

        # Create a tf listener
        my_tf_listener = TfListener(conn, object_path)
        if mode == "master":
            # Create a stream
            my_tf_listener.create_streams(SLAVE_ACCOUNT, [MEDIA_STREAM_TYPE_VIDEO])
##################################################        
class TfListener(gobject.GObject):
    def __init__(self, connection, chan_object_path):
        """
        Init
        @param connection: current connection
        @type connection: Telepathy Connection
        @param chan_object_path: Object path of the StreamedMedia channel
        @type chan_object_path: String
        """
        super(TfListener, self).__init__()        
        self.pipeline = None
        self.conn = connection
        
        # Create tfChannel with the telepathy channel path
        self.tf_channel = tpfarsight.Channel(
                                     connection_busname=self.conn.service_name,
                                     connection_path=self.conn.object_path,
                                     channel_path=chan_object_path)
        # Connect to several signals
        print "connecting to channel",  self.tf_channel
        self.tf_channel.connect("session-created", self.__on_session_created)
        self.tf_channel.connect("stream-created", self.__on_stream_created)
        self.tf_channel.connect("stream-get-codec-config",
                                self.__on_stream_get_codec_config)        
        
        # Get telepathy channel
        self.t_channel = telepathy.client.channel.Channel(self.conn.service_name,
                                                          chan_object_path,
                                                          self.conn.bus)
        self.t_channel[CHANNEL_INTERFACE_GROUP].connect_to_signal(
                                                    "MembersChanged",
                                                    self.__on_members_changed,
                                                    path_keyword='dbusPath')
        # Connect to several signals
        self.t_channel[CHANNEL_TYPE_STREAMED_MEDIA].connect_to_signal(None,
                                debug_callback, member_keyword='dbus_member')        
                
        # Regarding if streams are already in, and accepted them
        if self.t_channel[CHANNEL_TYPE_STREAMED_MEDIA].ListStreams() != []:
            self.accept_stream()            
            
        self.show_stream_informations()

    def create_streams(self, contact_name, streams, message=""):
        """
        Request a stream to specified contact
        @param contact_name: Contact name
        @type contact_name: String
        @param streams: Specified streams
        @type streams: Array of MEDIA_STREAM_TYPE_***
        @param message: Optional message
        @type message: String
        """  
        # Request a VIDEO stream and add contact
        contact_handle = self.conn[CONNECTION].RequestHandles(
                                                        HANDLE_TYPE_CONTACT,
                                                        [contact_name])[0]
        try:
            self.t_channel[CHANNEL_TYPE_STREAMED_MEDIA].RequestStreams(
                                                        contact_handle, streams)
        except dbus.exceptions.DBusException, e:
            exception = str(e).split(": ")
            if(exception[0] == "org.freedesktop.Telepathy.Error.Offline"):
                print "[Warning] %s (OK)" % (exception[1])
                return False
            else:
                print "Exception non traitee", exception[1]
                raise e
        else:
            self.t_channel[CHANNEL_INTERFACE_GROUP].AddMembers([contact_handle],
                                                               message)
        
    def accept_stream(self):
        """
        Add owner to accept the call
        """
        # Add owner
        self.t_channel[CHANNEL_INTERFACE_GROUP].AddMembers(
                                [self.conn[CONNECTION].GetSelfHandle()], "")

    def show_stream_informations(self):
        """
        Display stream informations
        """
        streams_infos = self.t_channel[CHANNEL_TYPE_STREAMED_MEDIA].ListStreams()
        for stream_infos in streams_infos:        
            print "\t== Stream Info =="
            print "\t%s stream (%d) offered by %s. State: %s || Direction %s || Flags %s" % (
                        media_stream_type[stream_infos[2]], stream_infos[0],
                        self.__get_contacts_name([stream_infos[1]])[0],
                        media_stream_state[stream_infos[3]],
                        media_stream_direction[stream_infos[4]], stream_infos[5])
            print

    def __on_session_created(self, channel, conference, participant):
        """
        On signal "session-created", create pipeline and add conference
        """
        print
        print "=== %s __on_session_created ===" % self
        print
       
        self.pipeline = gst.Pipeline()        
        self.pipeline.add(conference)
        self.pipeline.set_state(gst.STATE_PLAYING)
        
        # Transfer all bus message
        self.pipeline.get_bus().add_watch(self.__async_handler)
    
    def __on_stream_created(self, channel, stream):
        """
        On signal "stream-created", add videosource
        """
        print
        print "=== %s __on_stream_created ===" % self
        print
        
        stream.connect("src-pad-added", self.__src_pad_added)
        stream.connect("closed", debug_callback, "closed")
        stream.connect("error", debug_callback, "error")
        stream.connect("free-resource", debug_callback, "free")
        
        # creating src pipes        
#        if mode == "master": 
#            videosource = gst.parse_bin_from_description (
#                        "v4l2src ! timeoverlay", True)
#        else:
        videosource = gst.parse_bin_from_description (
                        "videotestsrc is-live=1 ! timeoverlay", True)    
    
        self.pipeline.add(videosource)
        
        pad = stream.get_property("sink-pad")
        videosource.get_pad("src").link(stream.get_property("sink-pad"))
        
        videosource.set_state(gst.STATE_PLAYING)
            
    def __async_handler (self, bus, message):
        """
        Check all bus message and redirect them to tf_channel (obligatory)
        """
        if self.tf_channel != None:
            self.tf_channel.bus_message(message)
        return True

    def __on_stream_get_codec_config(self, channel, stream_id, media_type,
                                     direction):
        """
        On signal "stream-get-codec-config", returns our codec configuration
        """
        print
        print "=== %s __on_stream_get_codec_config ===" % self
        print
#        return [farsight.Codec(farsight.CODEC_ID_DISABLE, "THEORA", farsight.MEDIA_TYPE_VIDEO, 0),
#                farsight.Codec(farsight.CODEC_ID_ANY, "H263", farsight.MEDIA_TYPE_VIDEO, 0)]
        return [farsight.Codec(farsight.CODEC_ID_ANY, "H263",
                               farsight.MEDIA_TYPE_VIDEO, 90000),
                farsight.Codec(farsight.CODEC_ID_ANY, "H263-1998",
                               farsight.MEDIA_TYPE_VIDEO, 90000),            
                farsight.Codec(farsight.CODEC_ID_ANY, "THEORA",
                               farsight.MEDIA_TYPE_VIDEO, 90000),
                farsight.Codec(farsight.CODEC_ID_ANY, "H264",
                               farsight.MEDIA_TYPE_VIDEO, 90000)]
    
    def __src_pad_added (self, stream, pad, codec):
        print
        print "=== %s __src_pad_added ===" % self
        print
        type = stream.get_property ("media-type")
        if type == farsight.MEDIA_TYPE_AUDIO:
            sink = gst.parse_bin_from_description(
                "queue ! audioconvert ! audioresample ! audioconvert ! autoaudiosink", True)
            audioadder = gst.element_factory_make("liveadder")
            audiotee = gst.element_factory_make("tee")
            
            self.pipeline.add(sink)
            self.pipeline.add(audioadder)
            self.pipeline.add(audiotee)
                
            audiotee.link(sink)
            audioadder.link(audiotee)
            
            pad.link(audioadder.get_pad("sink%d"))            
            
            sink.set_state(gst.STATE_PLAYING)
            audiotee.set_state(gst.STATE_PLAYING)
            audioadder.set_state(gst.STATE_PLAYING)
                
        elif type == farsight.MEDIA_TYPE_VIDEO:
            sink = gst.parse_bin_from_description(
                "queue ! ffmpegcolorspace ! videoscale ! autovideosink", True)
            videotee = gst.element_factory_make("tee")
            videofunnel = gst.element_factory_make("fsfunnel")
            
            self.pipeline.add(sink)
            self.pipeline.add(videofunnel)
            self.pipeline.add(videotee)     
            
            videofunnel.link(videotee)
            videotee.link(sink)
            
            pad.link(videofunnel.get_pad("sink%d"))    
            
            sink.set_state(gst.STATE_PLAYING)
            videotee.set_state(gst.STATE_PLAYING)
            videofunnel.set_state(gst.STATE_PLAYING)
        
    def __on_members_changed(self, *args, **kwargs):
        """
        When members changed in INTERFACE_GROUP of telepathy channel
        """
        print
        print "Members changed"
        
        print "t_channel", self.t_channel
        
        print "\tCurrent members\t\t", 
        for member in self.t_channel[CHANNEL_INTERFACE_GROUP].GetMembers():
            print "(", member, ")", self.__get_contacts_name([member])[0],
        
        print
        print "\tLocal pending members\t", 
        for member in self.t_channel[CHANNEL_INTERFACE_GROUP].GetLocalPendingMembers():
            print "(", member, ")", self.__get_contacts_name([member])[0],
        
        print    
        print "\tRemote pending members\t", 
        for member in self.t_channel[CHANNEL_INTERFACE_GROUP].GetRemotePendingMembers():
            print "(", member, ")", self.__get_contacts_name([member])[0],
            
        print
        print
        
    def __get_contacts_name(self, contact_handles):
        """
        Return an array with contact's name
        """    
        contact_names = self.conn[CONNECTION].InspectHandles(HANDLE_TYPE_CONTACT,
                                                        contact_handles)
        return contact_names
##################################################
##################################################

if len(sys.argv) != 2:
    print "Usage: %s {master|slave}" % sys.argv[0]
    exit(1)

mode = sys.argv[1]

print "=== %s ===" % sys.argv[0]
print "Mode: %s" % mode

# Enable parameters due to mode
if mode == "master":
    parameters = {"account": MASTER_ACCOUNT, "password": MASTER_PASSWORD}
else:
    parameters = {"account": SLAVE_ACCOUNT, "password": SLAVE_PASSWORD}

##################################################

# Get the requested manager
reg = telepathy.client.ManagerRegistry()
reg.LoadManagers()    
manager = reg.GetManager("gabble")

# Request connection
conn_bus_name, conn_object_path = manager[CONNECTION_MANAGER].\
                                        RequestConnection("jabber", parameters)             
conn = telepathy.client.Connection(conn_bus_name, conn_object_path)

# Listen signals
conn[CONNECTION].connect_to_signal("NewChannel", on_new_channel_user)
conn[CONNECTION].connect_to_signal("StatusChanged", on_status_changed_user)

# Connect
conn[CONNECTION].Connect()

# Wait
loop = gobject.MainLoop()
try:
    loop.run()
except KeyboardInterrupt:
    print 'interrupted'
_______________________________________________
telepathy mailing list
[email protected]
http://lists.freedesktop.org/mailman/listinfo/telepathy

Reply via email to