Hi,
I'm still trying to fix my audio/video problem. First, for resume, the
goal's program is to create a video-conferencing between two contacts
using Farsight and Telepathy.
This program works sometimes (I get two "video pops" and the connection
was stable), but often the program fails.
I'm wondering if there is a problem with my installation or with my
program itself. It would be nice if someone on the list could give a try
to the program. The script is attached and to test it just run two
clients, one with "slave" parameter and seconds with "master" parameter
(master account initiate the call or slave accept the call).
Prior to testing, fill in your accounts informations in line 70-73:
MASTER_ACCOUNT = "[email protected]"
MASTER_PASSWORD = "aaa"
SLAVE_ACCOUNT = "[email protected]"
SLAVE_PASSWORD = "bbb"
Test procedure is simple: launch the program a couple of times. Here it
works maybe one time out of five. Just tell me if it works on your
machine more or less often than that.
Thank you very much,
--
Fabien LOUIS <[email protected]>
"""
Example to create a videoconferencing application using Farsight throught
Telepathy
Works sometimes with telepathy-gabble-0.7.26
Just run two clients, one with "slave" parameter and seconds with "master"
parameter (master account initiate the call or slave accept the call).
"""
import sys
import os
os.environ["GST_PLUGIN_PATH"]="/usr/local/lib/gstreamer-0.10"
#os.environ["GST_DEBUG"] = "fsrtp*:5"
os.environ["GST_DEBUG"] = "2,fs*:5"
#os.environ["GST_DEBUG"] = "*PAD*:5,*bin*:5"
import pygst
pygst.require('0.10')
import gobject
import dbus.glib
import tpfarsight
import telepathy
import farsight, gst
from telepathy.interfaces import (
CONNECTION,
CONNECTION_MANAGER,
CONNECTION_INTERFACE_CAPABILITIES,
CHANNEL_TYPE_STREAMED_MEDIA,
CHANNEL_TYPE_TEXT,
CONNECTION_INTERFACE_REQUESTS,
CHANNEL_INTERFACE_GROUP,
)
from telepathy.constants import (
CONNECTION_STATUS_CONNECTED,
HANDLE_TYPE_CONTACT,
MEDIA_STREAM_TYPE_VIDEO,
CHANNEL_TEXT_MESSAGE_TYPE_NORMAL,
MEDIA_STREAM_PENDING_LOCAL_SEND,
MEDIA_STREAM_PENDING_REMOTE_SEND,
MEDIA_STREAM_TYPE_AUDIO,
MEDIA_STREAM_TYPE_VIDEO,
MEDIA_STREAM_STATE_DISCONNECTED,
MEDIA_STREAM_STATE_CONNECTED,
MEDIA_STREAM_STATE_CONNECTING,
MEDIA_STREAM_DIRECTION_NONE,
MEDIA_STREAM_DIRECTION_SEND,
MEDIA_STREAM_DIRECTION_RECEIVE,
MEDIA_STREAM_DIRECTION_BIDIRECTIONAL,
)
media_stream_type = {MEDIA_STREAM_TYPE_AUDIO : "Audio",\
MEDIA_STREAM_TYPE_VIDEO : "Video"}
media_stream_state = {MEDIA_STREAM_STATE_DISCONNECTED : "Disconnected",\
MEDIA_STREAM_STATE_CONNECTED : "Connected",
MEDIA_STREAM_STATE_CONNECTING : "Connecting"}
media_stream_direction = {MEDIA_STREAM_DIRECTION_NONE : "None",\
MEDIA_STREAM_DIRECTION_SEND : "Send",
MEDIA_STREAM_DIRECTION_RECEIVE : "Receive",
MEDIA_STREAM_DIRECTION_BIDIRECTIONAL : "Bidirectional"}
media_stream_pending_local = {MEDIA_STREAM_PENDING_LOCAL_SEND : "Local send",\
MEDIA_STREAM_PENDING_REMOTE_SEND : "Remote send"}
##################################################
MASTER_ACCOUNT = ""
MASTER_PASSWORD = ""
SLAVE_ACCOUNT = ""
SLAVE_PASSWORD = ""
##################################################
def debug_callback(self, *args, **kwargs):
print "debug_callback"
# Print all kwarg
for arg in args:
print "\t[arg] %s" % str(arg)
# Print all kwarg
for kwarg in kwargs:
print "\t[kwarg]%s: %s" % (kwarg, kwargs[kwarg])
def on_status_changed_user(state, reason):
if state == CONNECTION_STATUS_CONNECTED:
print "[user] Connected"
gobject.timeout_add(2000, enable_capabilities)
if mode == "master":
print "Creating channel"
gobject.timeout_add(5000, create_channel)
else:
print "Waiting streams..."
def enable_capabilities():
conn[CONNECTION_INTERFACE_CAPABILITIES].AdvertiseCapabilities(
[(CHANNEL_TYPE_STREAMED_MEDIA, 15)], [])
def create_channel():
# Request contact handle
contact_handle = conn[CONNECTION].RequestHandles(HANDLE_TYPE_CONTACT,
[SLAVE_ACCOUNT])[0]
# Create CHANNEL_TYPE_STREAMED_MEDIA with handle
channel_infos = conn[CONNECTION_INTERFACE_REQUESTS].CreateChannel({
"org.freedesktop.Telepathy.Channel.ChannelType":
CHANNEL_TYPE_STREAMED_MEDIA,
"org.freedesktop.Telepathy.Channel.TargetHandleType":
HANDLE_TYPE_CONTACT,
"org.freedesktop.Telepathy.Channel.TargetHandle":
contact_handle})
def on_new_channel_user(object_path, channel_type, handle_type,
handle, suppress_handler):
if channel_type == CHANNEL_TYPE_STREAMED_MEDIA:
# When CHANNEL_TYPE_STREAMED_MEDIA is ready
print "[user] New %s channel" % (channel_type)
# Create a tf listener
my_tf_listener = TfListener(conn, object_path)
if mode == "master":
# Create a stream
my_tf_listener.create_streams(SLAVE_ACCOUNT, [MEDIA_STREAM_TYPE_VIDEO])
##################################################
class TfListener(gobject.GObject):
def __init__(self, connection, chan_object_path):
"""
Init
@param connection: current connection
@type connection: Telepathy Connection
@param chan_object_path: Object path of the StreamedMedia channel
@type chan_object_path: String
"""
super(TfListener, self).__init__()
self.pipeline = None
self.conn = connection
# Create tfChannel with the telepathy channel path
self.tf_channel = tpfarsight.Channel(
connection_busname=self.conn.service_name,
connection_path=self.conn.object_path,
channel_path=chan_object_path)
# Connect to several signals
print "connecting to channel", self.tf_channel
self.tf_channel.connect("session-created", self.__on_session_created)
self.tf_channel.connect("stream-created", self.__on_stream_created)
self.tf_channel.connect("stream-get-codec-config",
self.__on_stream_get_codec_config)
# Get telepathy channel
self.t_channel = telepathy.client.channel.Channel(self.conn.service_name,
chan_object_path,
self.conn.bus)
self.t_channel[CHANNEL_INTERFACE_GROUP].connect_to_signal(
"MembersChanged",
self.__on_members_changed,
path_keyword='dbusPath')
# Connect to several signals
self.t_channel[CHANNEL_TYPE_STREAMED_MEDIA].connect_to_signal(None,
debug_callback, member_keyword='dbus_member')
# Regarding if streams are already in, and accepted them
if self.t_channel[CHANNEL_TYPE_STREAMED_MEDIA].ListStreams() != []:
self.accept_stream()
self.show_stream_informations()
def create_streams(self, contact_name, streams, message=""):
"""
Request a stream to specified contact
@param contact_name: Contact name
@type contact_name: String
@param streams: Specified streams
@type streams: Array of MEDIA_STREAM_TYPE_***
@param message: Optional message
@type message: String
"""
# Request a VIDEO stream and add contact
contact_handle = self.conn[CONNECTION].RequestHandles(
HANDLE_TYPE_CONTACT,
[contact_name])[0]
try:
self.t_channel[CHANNEL_TYPE_STREAMED_MEDIA].RequestStreams(
contact_handle, streams)
except dbus.exceptions.DBusException, e:
exception = str(e).split(": ")
if(exception[0] == "org.freedesktop.Telepathy.Error.Offline"):
print "[Warning] %s (OK)" % (exception[1])
return False
else:
print "Exception non traitee", exception[1]
raise e
else:
self.t_channel[CHANNEL_INTERFACE_GROUP].AddMembers([contact_handle],
message)
def accept_stream(self):
"""
Add owner to accept the call
"""
# Add owner
self.t_channel[CHANNEL_INTERFACE_GROUP].AddMembers(
[self.conn[CONNECTION].GetSelfHandle()], "")
def show_stream_informations(self):
"""
Display stream informations
"""
streams_infos = self.t_channel[CHANNEL_TYPE_STREAMED_MEDIA].ListStreams()
for stream_infos in streams_infos:
print "\t== Stream Info =="
print "\t%s stream (%d) offered by %s. State: %s || Direction %s || Flags %s" % (
media_stream_type[stream_infos[2]], stream_infos[0],
self.__get_contacts_name([stream_infos[1]])[0],
media_stream_state[stream_infos[3]],
media_stream_direction[stream_infos[4]], stream_infos[5])
print
def __on_session_created(self, channel, conference, participant):
"""
On signal "session-created", create pipeline and add conference
"""
print
print "=== %s __on_session_created ===" % self
print
self.pipeline = gst.Pipeline()
self.pipeline.add(conference)
self.pipeline.set_state(gst.STATE_PLAYING)
# Transfer all bus message
self.pipeline.get_bus().add_watch(self.__async_handler)
def __on_stream_created(self, channel, stream):
"""
On signal "stream-created", add videosource
"""
print
print "=== %s __on_stream_created ===" % self
print
stream.connect("src-pad-added", self.__src_pad_added)
stream.connect("closed", debug_callback, "closed")
stream.connect("error", debug_callback, "error")
stream.connect("free-resource", debug_callback, "free")
# creating src pipes
# if mode == "master":
# videosource = gst.parse_bin_from_description (
# "v4l2src ! timeoverlay", True)
# else:
videosource = gst.parse_bin_from_description (
"videotestsrc is-live=1 ! timeoverlay", True)
self.pipeline.add(videosource)
pad = stream.get_property("sink-pad")
videosource.get_pad("src").link(stream.get_property("sink-pad"))
videosource.set_state(gst.STATE_PLAYING)
def __async_handler (self, bus, message):
"""
Check all bus message and redirect them to tf_channel (obligatory)
"""
if self.tf_channel != None:
self.tf_channel.bus_message(message)
return True
def __on_stream_get_codec_config(self, channel, stream_id, media_type,
direction):
"""
On signal "stream-get-codec-config", returns our codec configuration
"""
print
print "=== %s __on_stream_get_codec_config ===" % self
print
# return [farsight.Codec(farsight.CODEC_ID_DISABLE, "THEORA", farsight.MEDIA_TYPE_VIDEO, 0),
# farsight.Codec(farsight.CODEC_ID_ANY, "H263", farsight.MEDIA_TYPE_VIDEO, 0)]
return [farsight.Codec(farsight.CODEC_ID_ANY, "H263",
farsight.MEDIA_TYPE_VIDEO, 90000),
farsight.Codec(farsight.CODEC_ID_ANY, "H263-1998",
farsight.MEDIA_TYPE_VIDEO, 90000),
farsight.Codec(farsight.CODEC_ID_ANY, "THEORA",
farsight.MEDIA_TYPE_VIDEO, 90000),
farsight.Codec(farsight.CODEC_ID_ANY, "H264",
farsight.MEDIA_TYPE_VIDEO, 90000)]
def __src_pad_added (self, stream, pad, codec):
print
print "=== %s __src_pad_added ===" % self
print
type = stream.get_property ("media-type")
if type == farsight.MEDIA_TYPE_AUDIO:
sink = gst.parse_bin_from_description(
"queue ! audioconvert ! audioresample ! audioconvert ! autoaudiosink", True)
audioadder = gst.element_factory_make("liveadder")
audiotee = gst.element_factory_make("tee")
self.pipeline.add(sink)
self.pipeline.add(audioadder)
self.pipeline.add(audiotee)
audiotee.link(sink)
audioadder.link(audiotee)
pad.link(audioadder.get_pad("sink%d"))
sink.set_state(gst.STATE_PLAYING)
audiotee.set_state(gst.STATE_PLAYING)
audioadder.set_state(gst.STATE_PLAYING)
elif type == farsight.MEDIA_TYPE_VIDEO:
sink = gst.parse_bin_from_description(
"queue ! ffmpegcolorspace ! videoscale ! autovideosink", True)
videotee = gst.element_factory_make("tee")
videofunnel = gst.element_factory_make("fsfunnel")
self.pipeline.add(sink)
self.pipeline.add(videofunnel)
self.pipeline.add(videotee)
videofunnel.link(videotee)
videotee.link(sink)
pad.link(videofunnel.get_pad("sink%d"))
sink.set_state(gst.STATE_PLAYING)
videotee.set_state(gst.STATE_PLAYING)
videofunnel.set_state(gst.STATE_PLAYING)
def __on_members_changed(self, *args, **kwargs):
"""
When members changed in INTERFACE_GROUP of telepathy channel
"""
print
print "Members changed"
print "t_channel", self.t_channel
print "\tCurrent members\t\t",
for member in self.t_channel[CHANNEL_INTERFACE_GROUP].GetMembers():
print "(", member, ")", self.__get_contacts_name([member])[0],
print
print "\tLocal pending members\t",
for member in self.t_channel[CHANNEL_INTERFACE_GROUP].GetLocalPendingMembers():
print "(", member, ")", self.__get_contacts_name([member])[0],
print
print "\tRemote pending members\t",
for member in self.t_channel[CHANNEL_INTERFACE_GROUP].GetRemotePendingMembers():
print "(", member, ")", self.__get_contacts_name([member])[0],
print
print
def __get_contacts_name(self, contact_handles):
"""
Return an array with contact's name
"""
contact_names = self.conn[CONNECTION].InspectHandles(HANDLE_TYPE_CONTACT,
contact_handles)
return contact_names
##################################################
##################################################
if len(sys.argv) != 2:
print "Usage: %s {master|slave}" % sys.argv[0]
exit(1)
mode = sys.argv[1]
print "=== %s ===" % sys.argv[0]
print "Mode: %s" % mode
# Enable parameters due to mode
if mode == "master":
parameters = {"account": MASTER_ACCOUNT, "password": MASTER_PASSWORD}
else:
parameters = {"account": SLAVE_ACCOUNT, "password": SLAVE_PASSWORD}
##################################################
# Get the requested manager
reg = telepathy.client.ManagerRegistry()
reg.LoadManagers()
manager = reg.GetManager("gabble")
# Request connection
conn_bus_name, conn_object_path = manager[CONNECTION_MANAGER].\
RequestConnection("jabber", parameters)
conn = telepathy.client.Connection(conn_bus_name, conn_object_path)
# Listen signals
conn[CONNECTION].connect_to_signal("NewChannel", on_new_channel_user)
conn[CONNECTION].connect_to_signal("StatusChanged", on_status_changed_user)
# Connect
conn[CONNECTION].Connect()
# Wait
loop = gobject.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
print 'interrupted'
_______________________________________________
telepathy mailing list
[email protected]
http://lists.freedesktop.org/mailman/listinfo/telepathy