A given packet won't always come in contiguously; sometimes
they may be broken up on chunk boundaries by packets of another
channel.
This support primarily involves tracking information about the
data that's been read, so the reader can pick up where it left
off for a given channel.
As a side effect, we no longer over-report the bytes read if
(toread = MIN(size, chunk_size)) == size
---
libavformat/rtmppkt.c | 55 ++++++++++++++++++++++++++++++++++++-----------
libavformat/rtmppkt.h | 14 ++++++++++--
libavformat/rtmpproto.c | 9 +++++---
3 files changed, 60 insertions(+), 18 deletions(-)
diff --git a/libavformat/rtmppkt.c b/libavformat/rtmppkt.c
index 8f39122..7619d28 100644
--- a/libavformat/rtmppkt.c
+++ b/libavformat/rtmppkt.c
@@ -130,28 +130,32 @@ int ff_amf_read_null(GetByteContext *bc)
}
int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p,
- int chunk_size, RTMPPacket *prev_pkt)
+ int chunk_size, RTMPPacket *prev_pkt,
+ int *retry_header)
{
uint8_t hdr;
- if (ffurl_read(h, &hdr, 1) != 1)
+ if (*retry_header >= 0) {
+ hdr = *retry_header & 0xFF;
+ *retry_header = -1;
+ } else if (ffurl_read(h, &hdr, 1) != 1)
return AVERROR(EIO);
- return ff_rtmp_packet_read_internal(h, p, chunk_size, prev_pkt, hdr);
+ return ff_rtmp_packet_read_internal(h, p, chunk_size, prev_pkt, hdr,
retry_header);
}
int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size,
- RTMPPacket *prev_pkt, uint8_t hdr)
+ RTMPPacket *prev_pkt, uint8_t hdr,
+ int* retry_header)
{
uint8_t t, buf[16];
- int channel_id, timestamp, size, offset = 0;
+ int channel_id, timestamp, size;
uint32_t extra = 0;
enum RTMPPacketType type;
int written = 0;
int ret;
- written++;
channel_id = hdr & 0x3F;
if (channel_id < 2) { //special case for channel number >= 64
@@ -198,9 +202,26 @@ int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket
*p, int chunk_size,
if (hdr != RTMP_PS_TWELVEBYTES)
timestamp += prev_pkt[channel_id].timestamp;
+ if (!prev_pkt[channel_id].read) {
if ((ret = ff_rtmp_packet_create(p, channel_id, type, timestamp,
size)) < 0)
return ret;
+ // include 1-byte header from ff_rtmp_packet_read
+ p->read = written + 1;
+ p->offset = 0;
+ } else {
+ // previous packet in this channel hasn't completed reading
+ RTMPPacket *prev = &prev_pkt[channel_id];
+ p->data = prev->data;
+ p->size = prev->size;
+ p->channel_id = prev->channel_id;
+ p->type = prev->type;
+ p->ts_delta = prev->ts_delta;
+ p->extra = prev->extra;
+ p->offset = prev->offset;
+ p->read = prev->read + written;
+ prev->data = NULL;
+ }
p->extra = extra;
// save history
prev_pkt[channel_id].channel_id = channel_id;
@@ -209,26 +230,34 @@ int ff_rtmp_packet_read_internal(URLContext *h,
RTMPPacket *p, int chunk_size,
prev_pkt[channel_id].ts_delta = timestamp -
prev_pkt[channel_id].timestamp;
prev_pkt[channel_id].timestamp = timestamp;
prev_pkt[channel_id].extra = extra;
+ size = size - p->offset;
while (size > 0) {
int toread = FFMIN(size, chunk_size);
- if (ffurl_read_complete(h, p->data + offset, toread) != toread) {
+ if (ffurl_read_complete(h, p->data + p->offset, toread) != toread) {
ff_rtmp_packet_destroy(p);
return AVERROR(EIO);
}
size -= chunk_size;
- offset += chunk_size;
- written += chunk_size;
+ p->read += toread;
+ p->offset += toread;
if (size > 0) {
if ((ret = ffurl_read_complete(h, &t, 1)) < 0) { // marker
ff_rtmp_packet_destroy(p);
return ret;
}
- written++;
- if (t != (0xC0 + channel_id))
- return -1;
+ p->read++;
+ if (t != (0xC0 + channel_id)) {
+ RTMPPacket *prev = &prev_pkt[channel_id];
+ *retry_header = t;
+ prev->data = p->data;
+ prev->read = p->read;
+ prev->offset = p->offset;
+ return 0; // should lead to AVERROR(EAGAIN)
+ }
}
}
- return written;
+ prev_pkt[channel_id].read = 0; // read complete; reset if needed
+ return p->read;
}
int ff_rtmp_packet_write(URLContext *h, RTMPPacket *pkt,
diff --git a/libavformat/rtmppkt.h b/libavformat/rtmppkt.h
index e3120be..768a22e 100644
--- a/libavformat/rtmppkt.h
+++ b/libavformat/rtmppkt.h
@@ -82,6 +82,8 @@ typedef struct RTMPPacket {
uint32_t extra; ///< probably an additional channel ID used
during streaming data
uint8_t *data; ///< packet payload
int size; ///< packet payload size
+ int offset; ///< amount of data read so far
+ int read; ///< amount read, including headers
} RTMPPacket;
/**
@@ -112,10 +114,15 @@ void ff_rtmp_packet_destroy(RTMPPacket *pkt);
* @param chunk_size current chunk size
* @param prev_pkt previously read packet headers for all channels
* (may be needed for restoring incomplete packet header)
+ * @param retry_header indicates a previously read header byte to use
+ for restoring an incomplete packet. On return,
+ can indicate the byte to cache for the next
+ iteration if needed. Values <0 are ignored.
* @return number of bytes read on success, negative value otherwise
*/
int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p,
- int chunk_size, RTMPPacket *prev_pkt);
+ int chunk_size, RTMPPacket *prev_pkt,
+ int *retry_header);
/**
* Read internal RTMP packet sent by the server.
*
@@ -125,10 +132,13 @@ int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p,
* @param prev_pkt previously read packet headers for all channels
* (may be needed for restoring incomplete packet header)
* @param c the first byte already read
+ * @param retry_header Output parameter that indicates the first
+ header byte to cache if needed. <0 if unneeded.
* @return number of bytes read on success, negative value otherwise
*/
int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size,
- RTMPPacket *prev_pkt, uint8_t c);
+ RTMPPacket *prev_pkt, uint8_t c,
+ int *retry_header);
/**
* Send RTMP packet to the server.
diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c
index dc14e56..1e769d7 100644
--- a/libavformat/rtmpproto.c
+++ b/libavformat/rtmpproto.c
@@ -122,6 +122,7 @@ typedef struct RTMPContext {
char auth_params[500];
int do_reconnect;
int auth_tried;
+ int retry_header;
} RTMPContext;
#define PLAYER_KEY_OPEN_PART_LEN 30 ///< length of partial key used for
first client digest signing
@@ -405,7 +406,7 @@ static int read_connect(URLContext *s, RTMPContext *rt)
GetByteContext gbc;
if ((ret = ff_rtmp_packet_read(rt->stream, &pkt, rt->in_chunk_size,
- rt->prev_pkt[1])) < 0)
+ rt->prev_pkt[1], &rt->retry_header)) < 0)
return ret;
cp = pkt.data;
bytestream2_init(&gbc, cp, pkt.size);
@@ -2162,7 +2163,7 @@ static int get_packet(URLContext *s, int for_header)
for (;;) {
RTMPPacket rpkt = { 0 };
if ((ret = ff_rtmp_packet_read(rt->stream, &rpkt,
- rt->in_chunk_size, rt->prev_pkt[0])) <=
0) {
+ rt->in_chunk_size, rt->prev_pkt[0],
&rt->retry_header)) <= 0) {
if (ret == 0) {
return AVERROR(EAGAIN);
} else {
@@ -2311,6 +2312,7 @@ static int rtmp_open(URLContext *s, const char *uri, int
flags)
rt->listen = 1;
rt->is_input = !(flags & AVIO_FLAG_WRITE);
+ rt->retry_header = -1;
av_url_split(proto, sizeof(proto), auth, sizeof(auth),
hostname, sizeof(hostname), &port,
@@ -2681,7 +2683,8 @@ static int rtmp_write(URLContext *s, const uint8_t *buf,
int size)
if ((ret = ff_rtmp_packet_read_internal(rt->stream, &rpkt,
rt->in_chunk_size,
- rt->prev_pkt[0], c)) <= 0)
+ rt->prev_pkt[0], c,
+ &rt->retry_header)) <= 0)
return ret;
if ((ret = rtmp_parse_result(s, rt, &rpkt)) < 0)
--
1.7.9.5
_______________________________________________
libav-devel mailing list
[email protected]
https://lists.libav.org/mailman/listinfo/libav-devel