I don't think Eric ever posted the latest version of his hdtv ringbuffer. I am attaching it here, so people have to opportunity to try it. He is on vacation for a couple of weeks.

Mine works a little better for me, when recording three shows and watching a fourth. However, Eric's might work better for other people. I have not done that much testing with Eric's version, so if it does work better I will have to figure out exactly what he does differently and try to come to a happy medium :-)

Eric's concept is the same as mine, and his critique of my code definitely improved it.

John

Index: libs/libmythtv/hdtvrecorder.cpp
===================================================================
RCS file: /var/lib/mythcvs/mythtv/libs/libmythtv/hdtvrecorder.cpp,v
retrieving revision 1.29
diff -d -u -r1.29 hdtvrecorder.cpp
--- libs/libmythtv/hdtvrecorder.cpp	8 Dec 2004 02:04:08 -0000	1.29
+++ libs/libmythtv/hdtvrecorder.cpp	14 Dec 2004 04:02:50 -0000
@@ -62,7 +62,8 @@
 #include <sys/ioctl.h>
 #include <sys/time.h>
 #include <ctime>
-#include "videodev_myth.h"
+#include <sched.h>
+//#include "videodev_myth.h"
 
 using namespace std;
 
@@ -101,7 +102,7 @@
 #endif
 
 HDTVRecorder::HDTVRecorder()
-    : RecorderBase(), _atsc_stream_fd(-1), _atsc_stream_data(0), _buffer(0), _buffer_size(0),
+    : RecorderBase(), _atsc_stream_fd(-1), _atsc_stream_data(0),
       _error(false), _wait_for_gop(true),
       _request_recording(false), _request_pause(false), _recording(false), _paused(false),
       _tspacket_count(0), _nullpacket_count(0), _resync_count(0),
@@ -111,13 +112,39 @@
     _atsc_stream_data = new ATSCStreamData(DEFAULT_PROGRAM);
 
     //_buffer_size = max((L2_CACHE_SIZE_KB*1024)/2-(16*1024), 8*1024);
-    _buffer_size = 2 * 1024 * 1024;
-    if ((_buffer = new unsigned char[_buffer_size])) {
-        // make valgrind happy, initialize buffer memory
-        memset(_buffer, 0xFF, _buffer_size);
+    //_buffer_size = 2 * 1024 * 1024;
+    //    if ((_buffer = new unsigned char[_buffer_size])) {
+    //        memset(_buffer, 0xFF, _buffer_size);
+    //    }
+    ringbuf.size = 64 * 1024 * 1024;
+    ringbuf.wrapextra = TSPacket::SIZE;
+
+    ringbuf.minreadsize = TSPacket::SIZE;
+    ringbuf.maxreadsize = 128 * 1024;
+
+    ringbuf.writesize = 2 * 1024 * 1024;
+
+    if ((ringbuf.buffer = new unsigned char[ringbuf.size + ringbuf.wrapextra]) == NULL) {
+        VERBOSE(VB_IMPORTANT, "Failed to allocate HDTVRecorder ring buffer.");
+        _error = true;
     }
+    else {
+        // make valgrind happy, initialize buffer memory
+	memset(ringbuf.buffer, 0xFF, ringbuf.size + ringbuf.wrapextra);
+        ringbuf.paused = false;
+        ringbuf.request_pause = false;
 
-    VERBOSE(VB_RECORD, QString("HD buffer size %1 KB").arg(_buffer_size/1024));
+	ringbuf.used = 0;
+	ringbuf.max_used = 0;
+	ringbuf.avg_cnt = 0;
+	ringbuf.avg_used = 0;
+	ringbuf.tlast = 0;
+
+        VERBOSE(VB_RECORD, QString("HD ring buffer size %1 KB")
+                .arg(ringbuf.size/1024));
+   }
+
+    VERBOSE(VB_RECORD, QString("HD buffer size %1 KB").arg(ringbuf.size/1024));
 }
 
 HDTVRecorder::~HDTVRecorder()
@@ -126,8 +153,8 @@
         close(_atsc_stream_fd);
     if (_atsc_stream_data)
         delete _atsc_stream_data;
-    if (_buffer)
-        delete[] _buffer;
+    if (ringbuf.buffer)
+        delete[] ringbuf.buffer;
 }
 
 void HDTVRecorder::SetOption(const QString &opt, int value)
@@ -153,24 +180,20 @@
 
 bool HDTVRecorder::Open()
 {
-    if (!_atsc_stream_data || !_buffer) 
+    if (!_atsc_stream_data || !ringbuf.buffer) 
         return false;
 
 #if FAKE_VIDEO
     // open file instead of device
-    if (_atsc_stream_fd >=0)
-    {
-        int ret = close(_atsc_stream_fd);
-        assert(ret);
-    }
+    Close();
     _atsc_stream_fd = open(FAKE_VIDEO_FILES[fake_video_index], O_RDWR);
     VERBOSE(VB_IMPORTANT, QString("Opened fake video source %1").arg(FAKE_VIDEO_FILES[fake_video_index]));
     fake_video_index = (fake_video_index+1)%FAKE_VIDEO_NUM;
 #else
-    if (_atsc_stream_fd <= 0)
+    if (_atsc_stream_fd < 0)
         _atsc_stream_fd = open(videodevice.ascii(), O_RDWR);
 #endif
-    if (_atsc_stream_fd <= 0)
+    if (_atsc_stream_fd < 0)
     {
         VERBOSE(VB_IMPORTANT, QString("Can't open video device: %1 chanfd = %2")
                 .arg(videodevice).arg(_atsc_stream_fd));
@@ -179,36 +202,56 @@
     return (_atsc_stream_fd>0);
 }
 
-bool readchan(int chanfd, unsigned char* buffer, int dlen) {
+void HDTVRecorder::Close()
+{
+    if (_atsc_stream_fd < 0) return;
+    int ret = close(_atsc_stream_fd);
+    if (ret < 0) {
+	perror("close");
+    }
+    _atsc_stream_fd = -1;
+}
+
+
+
+int readchan(int chanfd, unsigned char* buffer, int dlen) {
     int len = read(chanfd, buffer, dlen); // read next byte
     if (dlen != len)
     {
         if (len < 0)
         {
+	    if (errno == EAGAIN) {
+		return(0);
+	    }
             VERBOSE(VB_IMPORTANT, QString("HD1 error reading from device"));
             perror("read");
         }
-        else if (len == 0)
-            VERBOSE(VB_IMPORTANT, QString("HD2 end of file found in packet"));
-        else 
-            VERBOSE(VB_IMPORTANT, QString("HD3 partial read. This shouldn't happen!"));
+        else if (len == 0) {
+	    VERBOSE(VB_IMPORTANT, QString("HD2 end of file found in packet"));
+	} else {
+	    VERBOSE(VB_IMPORTANT, QString("HD3 partial read. This shouldn't happen!"));
+	}
     }
-    return (dlen == len);
+    return(len);
 }
 
 bool syncchan(int chanfd, int dlen, int keepsync) {
     unsigned char b[188];
     int i, j;
+    int len;
+
     for (i=0; i<dlen; i++) {
-        if (!readchan(chanfd, b, 1))
+        if (readchan(chanfd, b, 1) <= 0)
             break;
         if (SYNC_BYTE == b[0])
         {
-            if (readchan(chanfd, &b[1], TSPacket::SIZE-1)) {
+	    len = TSPacket::SIZE-1;
+            if (readchan(chanfd, &b[1], len) == len) {
                 i += (TSPacket::SIZE - 1);
                 for (j=0; j<keepsync; j++)
                 {
-                    if (!readchan(chanfd, b, TSPacket::SIZE))
+		    len = TSPacket::SIZE;
+                    if (readchan(chanfd, b, len) != len)
                         return false;
                     i += TSPacket::SIZE;
                     if (SYNC_BYTE != b[0])
@@ -232,9 +275,6 @@
 
 void HDTVRecorder::StartRecording(void)
 {
-    const int unsyncpackets = 50; // unsynced packets to look at before giving up
-    const int syncpackets   = 10; // synced packets to require before starting recording
-
     VERBOSE(VB_RECORD, QString("StartRecording"));
 
     if (!Open())
@@ -253,50 +293,136 @@
     _request_recording = true;
     _recording = true;
 
-    // sync device stream so it starts with a valid ts packet
-    if (!syncchan(_atsc_stream_fd, TSPacket::SIZE*unsyncpackets, syncpackets))
-    {
-        _error = true;
-        return;
-    }
-    int remainder = 0;
+    ringbuf.rdidx = 0;
+    ringbuf.wridx = 0;
+    ringbuf.run = true;
+    ringbuf.eof = false;
+    ringbuf.paused = false;
+    ringbuf.request_pause = false;
+
+    pthread_create(&ringbuf.thread, NULL, StartRingBuffer, reinterpret_cast<void *>(this));
+
     // TRANSFER DATA
     while (_request_recording) 
     {
+	int fifo_depth;
+	int process_len;
+	int remainder;
+
         if (_request_pause)
         {
-            _paused = true;
-            pauseWait.wakeAll();
+	    ringbuf.mutex.lock();
+	    ringbuf.request_pause = true;
+	    ringbuf.mutex.unlock();
 
-            usleep(50);
-            continue;
-        }
+	    ringbuf.pauseWait.wait(1000);  // wait for ringbuffer pause
 
-        int len = read(_atsc_stream_fd, &(_buffer[remainder]),
-                       _buffer_size - remainder);
+	    ringbuf.mutex.lock();
+	    _paused = ringbuf.paused;
+	    ringbuf.mutex.unlock();
 
-        if (len < 0)
-        {
-            VERBOSE(VB_IMPORTANT, QString("HD7 error reading from: %1").
-                    arg(videodevice));
-            perror("read");
+	    if (!_paused) {
+		VERBOSE(VB_IMPORTANT,
+		    QString("HD ringbuf thread is not pausing"));
+		usleep(1000);
+		continue;
+	    }
+
+	    pauseWait.wakeAll();
             continue;
         }
-        else if (len > 0)
-        {
-            len += remainder;
-            remainder = ProcessData(_buffer, len);
-            if (remainder > 0) // leftover bytes
-                memmove(&(_buffer[0]), &(_buffer[_buffer_size - remainder]), remainder);
-        }
-        else
-        {
-            VERBOSE(VB_IMPORTANT, QString("HD8 end of file found in packet"));
-            break;
-        }
+
+	ringbuf.mutex.lock();
+	fifo_depth = ringbuf.wridx - ringbuf.rdidx;
+
+	// Bytes to process
+	process_len = fifo_depth;
+
+	// Wrap case
+	if (process_len < 0) {
+
+	    // For stats
+	    fifo_depth += ringbuf.size;
+
+	    // Cap process_len
+	    process_len = ringbuf.size - ringbuf.rdidx;
+	    // The writer always copies ringbuf.wrapextra bytes *beyond* the 
+	    // end of the fifo. This gives us a minimum number of contiguous
+	    // bytes for routines that can't deal with the wrap. Another
+	    // solution to this problem would be to have all code that accesses
+	    // the buffer perform a wrap computation.
+	    process_len += (ringbuf.wridx > ringbuf.wrapextra) ?
+		ringbuf.wrapextra : ringbuf.wridx;
+	}
+	
+	// Don't process more than maxreadsize
+	if (process_len > ringbuf.maxreadsize) {
+	    process_len = ringbuf.maxreadsize;
+	}
+
+	// Enough data to work on?
+	if (fifo_depth < ringbuf.minreadsize) {
+
+	    // Writer saw eof?
+	    if (ringbuf.eof) {
+		ringbuf.mutex.unlock();
+		break;
+	    }
+
+	    // Wait for data
+	    ringbuf.readWait.wait(&ringbuf.mutex, 1000);
+	    ringbuf.mutex.unlock();
+	    continue;
+	}
+	ringbuf.mutex.unlock();
+
+	// Got some data...
+
+	// Update Stats/Watermarks -- print information
+	// every 5 seconds.
+	if (fifo_depth > ringbuf.max_used) {
+	    ringbuf.max_used = fifo_depth;
+	}
+	ringbuf.avg_used = ((ringbuf.avg_used * ringbuf.avg_cnt) + fifo_depth)
+	    / ++ringbuf.avg_cnt;
+	int t = time(NULL);
+	if (t >= ringbuf.tlast + 60) {
+	    VERBOSE(VB_IMPORTANT, QString("%1 ringbuf avg %2% max %3% cur %4% samples:%5")
+		    .arg(videodevice)
+		    .arg((static_cast<double>(ringbuf.avg_used) / ringbuf.size) * 100.0)
+		    .arg((static_cast<double>(ringbuf.max_used) / ringbuf.size) * 100.0)
+		    .arg((static_cast<double>(fifo_depth) / ringbuf.size) * 100.0)
+		    .arg(ringbuf.avg_cnt));
+	    ringbuf.tlast = t;
+	    ringbuf.avg_cnt = 0;
+	    ringbuf.avg_used = 0;
+	}
+
+	// Process data
+	remainder = ProcessData(&ringbuf.buffer[ringbuf.rdidx], process_len);
+
+	ringbuf.mutex.lock();
+	ringbuf.rdidx += (process_len - remainder);
+	if (ringbuf.rdidx >= ringbuf.size) {
+	    ringbuf.rdidx -= ringbuf.size;
+	}
+	ringbuf.writeWait.wakeAll();
+	ringbuf.mutex.unlock();
     }
 
+    if (ringbuf.eof) {
+	VERBOSE(VB_IMPORTANT, QString("HD8 end of file found in packet"));
+    }
+
+    // Clean up
+    ringbuf.mutex.lock();
+    ringbuf.run = false;
+    ringbuf.mutex.unlock();
+
+    pthread_join(ringbuf.thread, NULL);
+
     FinishRecording();
+    Close();  // close device
 
     _recording = false;
 }
@@ -332,6 +458,109 @@
     }
 }
 
+
+void *HDTVRecorder::StartRingBuffer(void *arg)
+{
+    HDTVRecorder *hdtvrec = reinterpret_cast<HDTVRecorder *>(arg);
+
+    hdtvrec->fill_ringbuffer();
+    return(NULL);
+}
+
+int HDTVRecorder::fill_ringbuffer()
+{
+    const int unsyncpackets = 50;
+    const int syncpackets = 10;
+    int       len;
+    int       read_bytes;
+    int       run;
+    bool      request_pause;
+    int       fifo_space;
+
+    // Sync up data stream
+    if (!syncchan(_atsc_stream_fd, TSPacket::SIZE*unsyncpackets, syncpackets))
+    {
+	pthread_exit(reinterpret_cast<void *>(-2));
+    }
+
+    while (1) {
+
+	// Get run status
+	ringbuf.mutex.lock();
+	request_pause = ringbuf.request_pause;
+	run = ringbuf.run;
+	ringbuf.mutex.unlock();
+
+	if (!run) break;
+
+	if (request_pause) {
+	    ringbuf.mutex.lock();
+	    ringbuf.paused = true;
+	    // Ideally we would issue an ioctl() here to tell the
+	    // device driver to stop recording, too!! (But that ioctl
+	    // currently doesn't exist -- so we'd have to close/reopen 
+	    // the device. I guess we could do that...) Because if we're
+	    // paused long enough, we're going to get a kernel 'buffer overrun'
+	    // message. And if we're changing channels, shouldn't we flush
+	    // the kernel buffer, too? Note sure what to do here...
+	    // Close(); 
+	    // Open();  ??
+	    ringbuf.pauseWait.wakeAll();
+	    ringbuf.mutex.unlock();
+	    usleep(5000);
+	    continue;
+	}
+
+	// Get fifo depth
+	ringbuf.mutex.lock();
+	fifo_space = ringbuf.rdidx - ringbuf.wridx;
+	if (fifo_space <= 0) fifo_space += ringbuf.size;
+
+	// Enough space?
+	if (fifo_space < ringbuf.writesize) {
+	    fflush(stdout);
+	    ringbuf.writeWait.wait(&ringbuf.mutex, ULONG_MAX);
+	    ringbuf.mutex.unlock();
+	    continue;
+	}
+	ringbuf.mutex.unlock();
+
+	// How much can we safely read?
+	read_bytes = ringbuf.writesize;
+	if (read_bytes + ringbuf.wridx > ringbuf.size) {
+	    read_bytes = fifo_space - ringbuf.wridx;
+	}
+
+	len = read(_atsc_stream_fd, &ringbuf.buffer[ringbuf.wridx], read_bytes);
+	if (len < 0) continue;
+	if (len == 0) {
+	    ringbuf.mutex.lock();
+	    ringbuf.eof = 1;
+	    ringbuf.mutex.unlock();
+	    break;  // Exit
+	}
+
+	// If we loaded below ringbuf.wrapextra, refresh end of buffer
+	if (ringbuf.wridx < ringbuf.wrapextra) {
+	    memmove(ringbuf.buffer + ringbuf.size,  // to end
+		    ringbuf.buffer,                 // from start
+		    ringbuf.wrapextra);
+	}
+
+	// Write pointer increment
+	ringbuf.mutex.lock();
+	ringbuf.wridx += len;
+	if (ringbuf.wridx >= ringbuf.size) ringbuf.wridx -= ringbuf.size;
+	ringbuf.readWait.wakeAll();
+	ringbuf.mutex.unlock();
+    }
+
+    pthread_exit(reinterpret_cast<void *>(0));
+    return(0);
+}
+
+
+
 void HDTVRecorder::SetVideoFilters(QString &filters)
 {
     (void)filters;
@@ -650,30 +879,7 @@
         pthread_mutex_unlock(db_lock);
     }
 
-    if (_atsc_stream_fd >= 0) 
-    {
-        int ret = close(_atsc_stream_fd);
-        if (ret < 0) 
-        {
-            perror("close");
-            return;
-        }
-#if FAKE_VIDEO
-        // open file instead of device
-        _atsc_stream_fd = open(FAKE_VIDEO_FILES[fake_video_index], O_RDWR);
-        VERBOSE(VB_IMPORTANT, QString("Opened fake video source %1").arg(FAKE_VIDEO_FILES[fake_video_index]));
-        fake_video_index = (fake_video_index+1)%FAKE_VIDEO_NUM;
-#else
-        _atsc_stream_fd = open(videodevice.ascii(), O_RDWR);
-#endif
-        if (_atsc_stream_fd < 0)
-        {
-            VERBOSE(VB_IMPORTANT, QString("HD1 Can't open video device: %1 chanfd = %2").
-                    arg(videodevice).arg(_atsc_stream_fd));
-            perror("open video");
-            return;
-        }
-    }
+    Close();
 
     StreamData()->Reset();
 }
@@ -697,7 +903,7 @@
 void HDTVRecorder::WaitForPause(void)
 {
     if (!_paused)
-        if (!pauseWait.wait(1000))
+        if (!pauseWait.wait(2000))
             VERBOSE(VB_IMPORTANT, QString("Waited too long for recorder to pause"));
 }
 
@@ -728,7 +934,7 @@
 
 void HDTVRecorder::ChannelNameChanged(const QString& new_chan)
 {
-    if (!_atsc_stream_data && !_buffer) 
+    if (!_atsc_stream_data && !ringbuf.buffer) 
         return;
     RecorderBase::ChannelNameChanged(new_chan);
     // look up freqid
Index: libs/libmythtv/hdtvrecorder.h
===================================================================
RCS file: /var/lib/mythcvs/mythtv/libs/libmythtv/hdtvrecorder.h,v
retrieving revision 1.19
diff -d -u -r1.19 hdtvrecorder.h
--- libs/libmythtv/hdtvrecorder.h	8 Dec 2004 02:04:08 -0000	1.19
+++ libs/libmythtv/hdtvrecorder.h	14 Dec 2004 04:02:50 -0000
@@ -41,6 +41,7 @@
     long long GetFramesWritten(void);
 
     bool Open(void);
+    void Close(void);
     int GetVideoFd(void);
 
     long long GetKeyframePosition(long long desired);
@@ -69,8 +70,38 @@
 
     int _atsc_stream_fd;
     ATSCStreamData* _atsc_stream_data;
-    unsigned char *_buffer;
-    unsigned int _buffer_size;
+
+    // Ring buffer
+    struct {
+	unsigned char  *buffer;
+	unsigned int    size;
+	unsigned int    wrapextra;
+	unsigned int    wridx;
+	unsigned int    rdidx;
+	int             minreadsize;
+	int             maxreadsize;
+	int             writesize;
+	bool            run;
+	bool            eof;
+	pthread_t       thread;
+	QMutex          mutex;
+	QWaitCondition  pauseWait;
+	QWaitCondition  readWait;
+	QWaitCondition  writeWait;
+	int             request_pause;
+	int             paused;
+	int             used;
+	int             max_used;
+	int             avg_used;
+	int             avg_cnt;
+	int             tlast;
+    } ringbuf;
+    int WaitForData();
+    int WaitForSpace();
+    static void *StartRingBuffer(void *arg);
+    int fill_ringbuffer();
+
+
     bool _error;
 
     // Wait for the a GOP before sending data, this prevents resolution changes from crashing ffmpeg
_______________________________________________
mythtv-dev mailing list
[EMAIL PROTECTED]
http://mythtv.org/cgi-bin/mailman/listinfo/mythtv-dev

Reply via email to