I don't think Eric ever posted the latest version of his hdtv ringbuffer. I am attaching it here, so people have to opportunity to try it. He is on vacation for a couple of weeks.
Mine works a little better for me, when recording three shows and watching a fourth. However, Eric's might work better for other people. I have not done that much testing with Eric's version, so if it does work better I will have to figure out exactly what he does differently and try to come to a happy medium :-)
Eric's concept is the same as mine, and his critique of my code definitely improved it.
John
Index: libs/libmythtv/hdtvrecorder.cpp
===================================================================
RCS file: /var/lib/mythcvs/mythtv/libs/libmythtv/hdtvrecorder.cpp,v
retrieving revision 1.29
diff -d -u -r1.29 hdtvrecorder.cpp
--- libs/libmythtv/hdtvrecorder.cpp 8 Dec 2004 02:04:08 -0000 1.29
+++ libs/libmythtv/hdtvrecorder.cpp 14 Dec 2004 04:02:50 -0000
@@ -62,7 +62,8 @@
#include <sys/ioctl.h>
#include <sys/time.h>
#include <ctime>
-#include "videodev_myth.h"
+#include <sched.h>
+//#include "videodev_myth.h"
using namespace std;
@@ -101,7 +102,7 @@
#endif
HDTVRecorder::HDTVRecorder()
- : RecorderBase(), _atsc_stream_fd(-1), _atsc_stream_data(0), _buffer(0), _buffer_size(0),
+ : RecorderBase(), _atsc_stream_fd(-1), _atsc_stream_data(0),
_error(false), _wait_for_gop(true),
_request_recording(false), _request_pause(false), _recording(false), _paused(false),
_tspacket_count(0), _nullpacket_count(0), _resync_count(0),
@@ -111,13 +112,39 @@
_atsc_stream_data = new ATSCStreamData(DEFAULT_PROGRAM);
//_buffer_size = max((L2_CACHE_SIZE_KB*1024)/2-(16*1024), 8*1024);
- _buffer_size = 2 * 1024 * 1024;
- if ((_buffer = new unsigned char[_buffer_size])) {
- // make valgrind happy, initialize buffer memory
- memset(_buffer, 0xFF, _buffer_size);
+ //_buffer_size = 2 * 1024 * 1024;
+ // if ((_buffer = new unsigned char[_buffer_size])) {
+ // memset(_buffer, 0xFF, _buffer_size);
+ // }
+ ringbuf.size = 64 * 1024 * 1024;
+ ringbuf.wrapextra = TSPacket::SIZE;
+
+ ringbuf.minreadsize = TSPacket::SIZE;
+ ringbuf.maxreadsize = 128 * 1024;
+
+ ringbuf.writesize = 2 * 1024 * 1024;
+
+ if ((ringbuf.buffer = new unsigned char[ringbuf.size + ringbuf.wrapextra]) == NULL) {
+ VERBOSE(VB_IMPORTANT, "Failed to allocate HDTVRecorder ring buffer.");
+ _error = true;
}
+ else {
+ // make valgrind happy, initialize buffer memory
+ memset(ringbuf.buffer, 0xFF, ringbuf.size + ringbuf.wrapextra);
+ ringbuf.paused = false;
+ ringbuf.request_pause = false;
- VERBOSE(VB_RECORD, QString("HD buffer size %1 KB").arg(_buffer_size/1024));
+ ringbuf.used = 0;
+ ringbuf.max_used = 0;
+ ringbuf.avg_cnt = 0;
+ ringbuf.avg_used = 0;
+ ringbuf.tlast = 0;
+
+ VERBOSE(VB_RECORD, QString("HD ring buffer size %1 KB")
+ .arg(ringbuf.size/1024));
+ }
+
+ VERBOSE(VB_RECORD, QString("HD buffer size %1 KB").arg(ringbuf.size/1024));
}
HDTVRecorder::~HDTVRecorder()
@@ -126,8 +153,8 @@
close(_atsc_stream_fd);
if (_atsc_stream_data)
delete _atsc_stream_data;
- if (_buffer)
- delete[] _buffer;
+ if (ringbuf.buffer)
+ delete[] ringbuf.buffer;
}
void HDTVRecorder::SetOption(const QString &opt, int value)
@@ -153,24 +180,20 @@
bool HDTVRecorder::Open()
{
- if (!_atsc_stream_data || !_buffer)
+ if (!_atsc_stream_data || !ringbuf.buffer)
return false;
#if FAKE_VIDEO
// open file instead of device
- if (_atsc_stream_fd >=0)
- {
- int ret = close(_atsc_stream_fd);
- assert(ret);
- }
+ Close();
_atsc_stream_fd = open(FAKE_VIDEO_FILES[fake_video_index], O_RDWR);
VERBOSE(VB_IMPORTANT, QString("Opened fake video source %1").arg(FAKE_VIDEO_FILES[fake_video_index]));
fake_video_index = (fake_video_index+1)%FAKE_VIDEO_NUM;
#else
- if (_atsc_stream_fd <= 0)
+ if (_atsc_stream_fd < 0)
_atsc_stream_fd = open(videodevice.ascii(), O_RDWR);
#endif
- if (_atsc_stream_fd <= 0)
+ if (_atsc_stream_fd < 0)
{
VERBOSE(VB_IMPORTANT, QString("Can't open video device: %1 chanfd = %2")
.arg(videodevice).arg(_atsc_stream_fd));
@@ -179,36 +202,56 @@
return (_atsc_stream_fd>0);
}
-bool readchan(int chanfd, unsigned char* buffer, int dlen) {
+void HDTVRecorder::Close()
+{
+ if (_atsc_stream_fd < 0) return;
+ int ret = close(_atsc_stream_fd);
+ if (ret < 0) {
+ perror("close");
+ }
+ _atsc_stream_fd = -1;
+}
+
+
+
+int readchan(int chanfd, unsigned char* buffer, int dlen) {
int len = read(chanfd, buffer, dlen); // read next byte
if (dlen != len)
{
if (len < 0)
{
+ if (errno == EAGAIN) {
+ return(0);
+ }
VERBOSE(VB_IMPORTANT, QString("HD1 error reading from device"));
perror("read");
}
- else if (len == 0)
- VERBOSE(VB_IMPORTANT, QString("HD2 end of file found in packet"));
- else
- VERBOSE(VB_IMPORTANT, QString("HD3 partial read. This shouldn't happen!"));
+ else if (len == 0) {
+ VERBOSE(VB_IMPORTANT, QString("HD2 end of file found in packet"));
+ } else {
+ VERBOSE(VB_IMPORTANT, QString("HD3 partial read. This shouldn't happen!"));
+ }
}
- return (dlen == len);
+ return(len);
}
bool syncchan(int chanfd, int dlen, int keepsync) {
unsigned char b[188];
int i, j;
+ int len;
+
for (i=0; i<dlen; i++) {
- if (!readchan(chanfd, b, 1))
+ if (readchan(chanfd, b, 1) <= 0)
break;
if (SYNC_BYTE == b[0])
{
- if (readchan(chanfd, &b[1], TSPacket::SIZE-1)) {
+ len = TSPacket::SIZE-1;
+ if (readchan(chanfd, &b[1], len) == len) {
i += (TSPacket::SIZE - 1);
for (j=0; j<keepsync; j++)
{
- if (!readchan(chanfd, b, TSPacket::SIZE))
+ len = TSPacket::SIZE;
+ if (readchan(chanfd, b, len) != len)
return false;
i += TSPacket::SIZE;
if (SYNC_BYTE != b[0])
@@ -232,9 +275,6 @@
void HDTVRecorder::StartRecording(void)
{
- const int unsyncpackets = 50; // unsynced packets to look at before giving up
- const int syncpackets = 10; // synced packets to require before starting recording
-
VERBOSE(VB_RECORD, QString("StartRecording"));
if (!Open())
@@ -253,50 +293,136 @@
_request_recording = true;
_recording = true;
- // sync device stream so it starts with a valid ts packet
- if (!syncchan(_atsc_stream_fd, TSPacket::SIZE*unsyncpackets, syncpackets))
- {
- _error = true;
- return;
- }
- int remainder = 0;
+ ringbuf.rdidx = 0;
+ ringbuf.wridx = 0;
+ ringbuf.run = true;
+ ringbuf.eof = false;
+ ringbuf.paused = false;
+ ringbuf.request_pause = false;
+
+ pthread_create(&ringbuf.thread, NULL, StartRingBuffer, reinterpret_cast<void *>(this));
+
// TRANSFER DATA
while (_request_recording)
{
+ int fifo_depth;
+ int process_len;
+ int remainder;
+
if (_request_pause)
{
- _paused = true;
- pauseWait.wakeAll();
+ ringbuf.mutex.lock();
+ ringbuf.request_pause = true;
+ ringbuf.mutex.unlock();
- usleep(50);
- continue;
- }
+ ringbuf.pauseWait.wait(1000); // wait for ringbuffer pause
- int len = read(_atsc_stream_fd, &(_buffer[remainder]),
- _buffer_size - remainder);
+ ringbuf.mutex.lock();
+ _paused = ringbuf.paused;
+ ringbuf.mutex.unlock();
- if (len < 0)
- {
- VERBOSE(VB_IMPORTANT, QString("HD7 error reading from: %1").
- arg(videodevice));
- perror("read");
+ if (!_paused) {
+ VERBOSE(VB_IMPORTANT,
+ QString("HD ringbuf thread is not pausing"));
+ usleep(1000);
+ continue;
+ }
+
+ pauseWait.wakeAll();
continue;
}
- else if (len > 0)
- {
- len += remainder;
- remainder = ProcessData(_buffer, len);
- if (remainder > 0) // leftover bytes
- memmove(&(_buffer[0]), &(_buffer[_buffer_size - remainder]), remainder);
- }
- else
- {
- VERBOSE(VB_IMPORTANT, QString("HD8 end of file found in packet"));
- break;
- }
+
+ ringbuf.mutex.lock();
+ fifo_depth = ringbuf.wridx - ringbuf.rdidx;
+
+ // Bytes to process
+ process_len = fifo_depth;
+
+ // Wrap case
+ if (process_len < 0) {
+
+ // For stats
+ fifo_depth += ringbuf.size;
+
+ // Cap process_len
+ process_len = ringbuf.size - ringbuf.rdidx;
+ // The writer always copies ringbuf.wrapextra bytes *beyond* the
+ // end of the fifo. This gives us a minimum number of contiguous
+ // bytes for routines that can't deal with the wrap. Another
+ // solution to this problem would be to have all code that accesses
+ // the buffer perform a wrap computation.
+ process_len += (ringbuf.wridx > ringbuf.wrapextra) ?
+ ringbuf.wrapextra : ringbuf.wridx;
+ }
+
+ // Don't process more than maxreadsize
+ if (process_len > ringbuf.maxreadsize) {
+ process_len = ringbuf.maxreadsize;
+ }
+
+ // Enough data to work on?
+ if (fifo_depth < ringbuf.minreadsize) {
+
+ // Writer saw eof?
+ if (ringbuf.eof) {
+ ringbuf.mutex.unlock();
+ break;
+ }
+
+ // Wait for data
+ ringbuf.readWait.wait(&ringbuf.mutex, 1000);
+ ringbuf.mutex.unlock();
+ continue;
+ }
+ ringbuf.mutex.unlock();
+
+ // Got some data...
+
+ // Update Stats/Watermarks -- print information
+ // every 5 seconds.
+ if (fifo_depth > ringbuf.max_used) {
+ ringbuf.max_used = fifo_depth;
+ }
+ ringbuf.avg_used = ((ringbuf.avg_used * ringbuf.avg_cnt) + fifo_depth)
+ / ++ringbuf.avg_cnt;
+ int t = time(NULL);
+ if (t >= ringbuf.tlast + 60) {
+ VERBOSE(VB_IMPORTANT, QString("%1 ringbuf avg %2% max %3% cur %4% samples:%5")
+ .arg(videodevice)
+ .arg((static_cast<double>(ringbuf.avg_used) / ringbuf.size) * 100.0)
+ .arg((static_cast<double>(ringbuf.max_used) / ringbuf.size) * 100.0)
+ .arg((static_cast<double>(fifo_depth) / ringbuf.size) * 100.0)
+ .arg(ringbuf.avg_cnt));
+ ringbuf.tlast = t;
+ ringbuf.avg_cnt = 0;
+ ringbuf.avg_used = 0;
+ }
+
+ // Process data
+ remainder = ProcessData(&ringbuf.buffer[ringbuf.rdidx], process_len);
+
+ ringbuf.mutex.lock();
+ ringbuf.rdidx += (process_len - remainder);
+ if (ringbuf.rdidx >= ringbuf.size) {
+ ringbuf.rdidx -= ringbuf.size;
+ }
+ ringbuf.writeWait.wakeAll();
+ ringbuf.mutex.unlock();
}
+ if (ringbuf.eof) {
+ VERBOSE(VB_IMPORTANT, QString("HD8 end of file found in packet"));
+ }
+
+ // Clean up
+ ringbuf.mutex.lock();
+ ringbuf.run = false;
+ ringbuf.mutex.unlock();
+
+ pthread_join(ringbuf.thread, NULL);
+
FinishRecording();
+ Close(); // close device
_recording = false;
}
@@ -332,6 +458,109 @@
}
}
+
+void *HDTVRecorder::StartRingBuffer(void *arg)
+{
+ HDTVRecorder *hdtvrec = reinterpret_cast<HDTVRecorder *>(arg);
+
+ hdtvrec->fill_ringbuffer();
+ return(NULL);
+}
+
+int HDTVRecorder::fill_ringbuffer()
+{
+ const int unsyncpackets = 50;
+ const int syncpackets = 10;
+ int len;
+ int read_bytes;
+ int run;
+ bool request_pause;
+ int fifo_space;
+
+ // Sync up data stream
+ if (!syncchan(_atsc_stream_fd, TSPacket::SIZE*unsyncpackets, syncpackets))
+ {
+ pthread_exit(reinterpret_cast<void *>(-2));
+ }
+
+ while (1) {
+
+ // Get run status
+ ringbuf.mutex.lock();
+ request_pause = ringbuf.request_pause;
+ run = ringbuf.run;
+ ringbuf.mutex.unlock();
+
+ if (!run) break;
+
+ if (request_pause) {
+ ringbuf.mutex.lock();
+ ringbuf.paused = true;
+ // Ideally we would issue an ioctl() here to tell the
+ // device driver to stop recording, too!! (But that ioctl
+ // currently doesn't exist -- so we'd have to close/reopen
+ // the device. I guess we could do that...) Because if we're
+ // paused long enough, we're going to get a kernel 'buffer overrun'
+ // message. And if we're changing channels, shouldn't we flush
+ // the kernel buffer, too? Note sure what to do here...
+ // Close();
+ // Open(); ??
+ ringbuf.pauseWait.wakeAll();
+ ringbuf.mutex.unlock();
+ usleep(5000);
+ continue;
+ }
+
+ // Get fifo depth
+ ringbuf.mutex.lock();
+ fifo_space = ringbuf.rdidx - ringbuf.wridx;
+ if (fifo_space <= 0) fifo_space += ringbuf.size;
+
+ // Enough space?
+ if (fifo_space < ringbuf.writesize) {
+ fflush(stdout);
+ ringbuf.writeWait.wait(&ringbuf.mutex, ULONG_MAX);
+ ringbuf.mutex.unlock();
+ continue;
+ }
+ ringbuf.mutex.unlock();
+
+ // How much can we safely read?
+ read_bytes = ringbuf.writesize;
+ if (read_bytes + ringbuf.wridx > ringbuf.size) {
+ read_bytes = fifo_space - ringbuf.wridx;
+ }
+
+ len = read(_atsc_stream_fd, &ringbuf.buffer[ringbuf.wridx], read_bytes);
+ if (len < 0) continue;
+ if (len == 0) {
+ ringbuf.mutex.lock();
+ ringbuf.eof = 1;
+ ringbuf.mutex.unlock();
+ break; // Exit
+ }
+
+ // If we loaded below ringbuf.wrapextra, refresh end of buffer
+ if (ringbuf.wridx < ringbuf.wrapextra) {
+ memmove(ringbuf.buffer + ringbuf.size, // to end
+ ringbuf.buffer, // from start
+ ringbuf.wrapextra);
+ }
+
+ // Write pointer increment
+ ringbuf.mutex.lock();
+ ringbuf.wridx += len;
+ if (ringbuf.wridx >= ringbuf.size) ringbuf.wridx -= ringbuf.size;
+ ringbuf.readWait.wakeAll();
+ ringbuf.mutex.unlock();
+ }
+
+ pthread_exit(reinterpret_cast<void *>(0));
+ return(0);
+}
+
+
+
void HDTVRecorder::SetVideoFilters(QString &filters)
{
(void)filters;
@@ -650,30 +879,7 @@
pthread_mutex_unlock(db_lock);
}
- if (_atsc_stream_fd >= 0)
- {
- int ret = close(_atsc_stream_fd);
- if (ret < 0)
- {
- perror("close");
- return;
- }
-#if FAKE_VIDEO
- // open file instead of device
- _atsc_stream_fd = open(FAKE_VIDEO_FILES[fake_video_index], O_RDWR);
- VERBOSE(VB_IMPORTANT, QString("Opened fake video source %1").arg(FAKE_VIDEO_FILES[fake_video_index]));
- fake_video_index = (fake_video_index+1)%FAKE_VIDEO_NUM;
-#else
- _atsc_stream_fd = open(videodevice.ascii(), O_RDWR);
-#endif
- if (_atsc_stream_fd < 0)
- {
- VERBOSE(VB_IMPORTANT, QString("HD1 Can't open video device: %1 chanfd = %2").
- arg(videodevice).arg(_atsc_stream_fd));
- perror("open video");
- return;
- }
- }
+ Close();
StreamData()->Reset();
}
@@ -697,7 +903,7 @@
void HDTVRecorder::WaitForPause(void)
{
if (!_paused)
- if (!pauseWait.wait(1000))
+ if (!pauseWait.wait(2000))
VERBOSE(VB_IMPORTANT, QString("Waited too long for recorder to pause"));
}
@@ -728,7 +934,7 @@
void HDTVRecorder::ChannelNameChanged(const QString& new_chan)
{
- if (!_atsc_stream_data && !_buffer)
+ if (!_atsc_stream_data && !ringbuf.buffer)
return;
RecorderBase::ChannelNameChanged(new_chan);
// look up freqid
Index: libs/libmythtv/hdtvrecorder.h
===================================================================
RCS file: /var/lib/mythcvs/mythtv/libs/libmythtv/hdtvrecorder.h,v
retrieving revision 1.19
diff -d -u -r1.19 hdtvrecorder.h
--- libs/libmythtv/hdtvrecorder.h 8 Dec 2004 02:04:08 -0000 1.19
+++ libs/libmythtv/hdtvrecorder.h 14 Dec 2004 04:02:50 -0000
@@ -41,6 +41,7 @@
long long GetFramesWritten(void);
bool Open(void);
+ void Close(void);
int GetVideoFd(void);
long long GetKeyframePosition(long long desired);
@@ -69,8 +70,38 @@
int _atsc_stream_fd;
ATSCStreamData* _atsc_stream_data;
- unsigned char *_buffer;
- unsigned int _buffer_size;
+
+ // Ring buffer
+ struct {
+ unsigned char *buffer;
+ unsigned int size;
+ unsigned int wrapextra;
+ unsigned int wridx;
+ unsigned int rdidx;
+ int minreadsize;
+ int maxreadsize;
+ int writesize;
+ bool run;
+ bool eof;
+ pthread_t thread;
+ QMutex mutex;
+ QWaitCondition pauseWait;
+ QWaitCondition readWait;
+ QWaitCondition writeWait;
+ int request_pause;
+ int paused;
+ int used;
+ int max_used;
+ int avg_used;
+ int avg_cnt;
+ int tlast;
+ } ringbuf;
+ int WaitForData();
+ int WaitForSpace();
+ static void *StartRingBuffer(void *arg);
+ int fill_ringbuffer();
+
+
bool _error;
// Wait for the a GOP before sending data, this prevents resolution changes from crashing ffmpeg
_______________________________________________ mythtv-dev mailing list [EMAIL PROTECTED] http://mythtv.org/cgi-bin/mailman/listinfo/mythtv-dev
