Right now if for some reason one replica stops requesting frames from
replicator but dows not deactivates whole delivery process stops as
replicator patiently waits for this replica to request frames.
This patch adds optional frame delivery timeout to StreamReplicator
which functions as follows:
After master replica receives frame, callback is scheduled to be
called after frameDeliveryTimeout microseconds. This callback
deactivates all replicas that didn't manage to request current frame in
time and advances replicator to next frame. Using 0 as frameDeliveryTimeout
disables this feature. Using this feature will allow other replicas to
operate mostly unaffected (other then onetime slowdown in delivery) by
such misbehaving replica.
As a side effect this patch arranges all replicas created by
replicator in a list, which may be good for debugging purposes (right
now replicator does not hold are references to replicas that does not
request anything).
---
liveMedia/StreamReplicator.cpp | 53 +++++++++++++++++++++++++++++------
liveMedia/include/StreamReplicator.hh | 10 +++++--
2 files changed, 53 insertions(+), 10 deletions(-)
--
Stas Tsymbalov
TrueConf LLC
http://trueconf.com/
diff --git a/liveMedia/StreamReplicator.cpp b/liveMedia/StreamReplicator.cpp
index e910eff..9ae4bde 100644
--- a/liveMedia/StreamReplicator.cpp
+++ b/liveMedia/StreamReplicator.cpp
@@ -41,29 +41,36 @@ private:
// Replicas that are currently awaiting data are kept in a (singly-linked) list:
StreamReplica* fNext;
+ StreamReplica* fAllNext;
};
////////// StreamReplicator implementation //////////
-StreamReplicator* StreamReplicator::createNew(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies) {
- return new StreamReplicator(env, inputSource, deleteWhenLastReplicaDies);
+StreamReplicator* StreamReplicator::createNew(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies, unsigned frameDeliveryTimeout) {
+ return new StreamReplicator(env, inputSource, deleteWhenLastReplicaDies, frameDeliveryTimeout);
}
-StreamReplicator::StreamReplicator(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies)
+StreamReplicator::StreamReplicator(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies, unsigned frameDeliveryTimeout)
: Medium(env),
fInputSource(inputSource), fDeleteWhenLastReplicaDies(deleteWhenLastReplicaDies), fInputSourceHasClosed(False),
fNumReplicas(0), fNumActiveReplicas(0), fNumDeliveriesMadeSoFar(0),
- fFrameIndex(0), fMasterReplica(NULL), fReplicasAwaitingCurrentFrame(NULL), fReplicasAwaitingNextFrame(NULL) {
+ fFrameIndex(0),
+ fFrameDeliveryTimeout(frameDeliveryTimeout), fFrameDeliveryToken(0),
+ fAllReplicas(NULL), fMasterReplica(NULL), fReplicasAwaitingCurrentFrame(NULL), fReplicasAwaitingNextFrame(NULL) {
}
StreamReplicator::~StreamReplicator() {
+ envir().taskScheduler().unscheduleDelayedTask(fFrameDeliveryToken);
Medium::close(fInputSource);
}
FramedSource* StreamReplicator::createStreamReplica() {
++fNumReplicas;
- return new StreamReplica(*this);
+ StreamReplica* replica = new StreamReplica(*this);
+ replica->fAllNext = fAllReplicas;
+ fAllReplicas = replica;
+ return replica;
}
void StreamReplicator::getNextFrame(StreamReplica* replica) {
@@ -198,9 +205,18 @@ void StreamReplicator::removeStreamReplica(StreamReplica* replicaBeingRemoved) {
deactivateStreamReplica(replicaBeingRemoved);
// Assert: fNumReplicas > 0
- if (fNumReplicas == 0) fprintf(stderr, "StreamReplicator::removeStreamReplica() Internal Error!\n"); // should not happen
+ if (fNumReplicas == 0 && fAllReplicas != NULL) fprintf(stderr, "StreamReplicator::removeStreamReplica() Internal Error!\n"); // should not happen
--fNumReplicas;
+ if (replicaBeingRemoved == fAllReplicas)
+ fAllReplicas = replicaBeingRemoved->fAllNext;
+ else
+ for (StreamReplica* r = fAllReplicas; r->fAllNext != NULL; r = r->fAllNext)
+ if (r->fAllNext == replicaBeingRemoved) {
+ r->fAllNext = replicaBeingRemoved->fAllNext;
+ break;
+ }
+
// If this was the last replica, then delete ourselves (if we were set up to do so):
if (fNumReplicas == 0 && fDeleteWhenLastReplicaDies) {
Medium::close(this);
@@ -216,12 +232,15 @@ void StreamReplicator::afterGettingFrame(void* clientData, unsigned frameSize, u
void StreamReplicator::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes,
struct timeval presentationTime, unsigned durationInMicroseconds) {
// The frame was read into our master replica's buffer. Update the master replica's state, but don't complete delivery to it
- // just yet. We do that later, after we're sure that we've delivered it to all other replicas.
+ // just yet. We do that later, after we're sure that we've delivered it to all other replicas, or frame delivery timeout occur.
fMasterReplica->fFrameSize = frameSize;
fMasterReplica->fNumTruncatedBytes = numTruncatedBytes;
fMasterReplica->fPresentationTime = presentationTime;
fMasterReplica->fDurationInMicroseconds = durationInMicroseconds;
+ if (fFrameDeliveryTimeout > 0)
+ fFrameDeliveryToken = envir().taskScheduler().scheduleDelayedTask(fFrameDeliveryTimeout, onFrameDeliveryTimeout, this);
+
deliverReceivedFrame();
}
@@ -250,6 +269,22 @@ void StreamReplicator::onSourceClosure() {
}
}
+void StreamReplicator::onFrameDeliveryTimeout(void* clientData) {
+ ((StreamReplicator*)clientData)->onFrameDeliveryTimeout();
+}
+
+void StreamReplicator::onFrameDeliveryTimeout() {
+ fFrameDeliveryToken = 0;
+
+ // We deactivate all replicas that didn't request current frame, so we won't have to wait for them next time
+ for (StreamReplica* replica = fAllReplicas; replica != NULL; replica = replica->fAllNext)
+ if (replica != fMasterReplica && replica->fFrameIndex == fFrameIndex && !replica->isCurrentlyAwaitingData())
+ deactivateStreamReplica(replica);
+
+ // Now we should be requesting next frame, as last call to deactivateStreamReplica() should have triggered
+ // delivery to master replica.
+}
+
void StreamReplicator::deliverReceivedFrame() {
// The 'master replica' has received its copy of the current frame.
// Copy it (and complete delivery) to any other replica that has requested this frame.
@@ -280,6 +315,8 @@ void StreamReplicator::deliverReceivedFrame() {
fFrameIndex = 1 - fFrameIndex; // toggle it (0<->1) for the next frame
fNumDeliveriesMadeSoFar = 0; // reset for the next frame
+ envir().taskScheduler().unscheduleDelayedTask(fFrameDeliveryToken);
+
if (fReplicasAwaitingNextFrame != NULL) {
// One of the other replicas has already requested the next frame, so make it the next 'master replica':
fMasterReplica = fReplicasAwaitingNextFrame;
@@ -307,7 +344,7 @@ void StreamReplicator::deliverReceivedFrame() {
StreamReplica::StreamReplica(StreamReplicator& ourReplicator)
: FramedSource(ourReplicator.envir()),
fOurReplicator(ourReplicator),
- fFrameIndex(-1/*we haven't started playing yet*/), fNext(NULL) {
+ fFrameIndex(-1/*we haven't started playing yet*/), fNext(NULL), fAllNext(NULL) {
}
StreamReplica::~StreamReplica() {
diff --git a/liveMedia/include/StreamReplicator.hh b/liveMedia/include/StreamReplicator.hh
index 42a49cb..a8a3c07 100644
--- a/liveMedia/include/StreamReplicator.hh
+++ b/liveMedia/include/StreamReplicator.hh
@@ -29,7 +29,7 @@ class StreamReplica; // forward
class StreamReplicator: public Medium {
public:
- static StreamReplicator* createNew(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies = True);
+ static StreamReplicator* createNew(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies = True, unsigned frameDeliveryTimeout = 0);
// If "deleteWhenLastReplicaDies" is True (the default), then the "StreamReplicator" object is deleted when (and only when)
// all replicas have been deleted. (In this case, you must *not* call "Medium::close()" on the "StreamReplicator" object,
// unless you never created any replicas from it to begin with.)
@@ -47,7 +47,7 @@ public:
void detachInputSource() { fInputSource = NULL; }
protected:
- StreamReplicator(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies);
+ StreamReplicator(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies, unsigned frameDeliveryTimeout);
// called only by "createNew()"
virtual ~StreamReplicator();
@@ -69,6 +69,9 @@ private:
static void onSourceClosure(void* clientData);
void onSourceClosure();
+ static void onFrameDeliveryTimeout(void* clientData);
+ void onFrameDeliveryTimeout();
+
void deliverReceivedFrame();
private:
@@ -76,7 +79,10 @@ private:
Boolean fDeleteWhenLastReplicaDies, fInputSourceHasClosed;
unsigned fNumReplicas, fNumActiveReplicas, fNumDeliveriesMadeSoFar;
int fFrameIndex; // 0 or 1; used to figure out if a replica is requesting the current frame, or the next frame
+ unsigned fFrameDeliveryTimeout;
+ TaskToken fFrameDeliveryToken;
+ StreamReplica* fAllReplicas;
StreamReplica* fMasterReplica; // the first replica that requests each frame. We use its buffer when copying to the others.
StreamReplica* fReplicasAwaitingCurrentFrame; // other than the 'master' replica
StreamReplica* fReplicasAwaitingNextFrame; // replicas that have already received the current frame, and have asked for the next
_______________________________________________
live-devel mailing list
[email protected]
http://lists.live555.com/mailman/listinfo/live-devel