A given packet won't always come in contiguously; sometimes
they may be broken up on chunk boundaries by packets of another
channel.

This support primarily involves tracking information about the
data that's been read, so the reader can pick up where it left
off for a given channel.

As a side effect, we no longer over-report the bytes read if
(toread = MIN(size, chunk_size)) == size
---
 libavformat/rtmppkt.c   |   55 ++++++++++++++++++++++++++++++++++++-----------
 libavformat/rtmppkt.h   |   14 ++++++++++--
 libavformat/rtmpproto.c |    9 +++++---
 3 files changed, 60 insertions(+), 18 deletions(-)

diff --git a/libavformat/rtmppkt.c b/libavformat/rtmppkt.c
index 8f39122..7619d28 100644
--- a/libavformat/rtmppkt.c
+++ b/libavformat/rtmppkt.c
@@ -130,28 +130,32 @@ int ff_amf_read_null(GetByteContext *bc)
 }
 
 int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p,
-                        int chunk_size, RTMPPacket *prev_pkt)
+                        int chunk_size, RTMPPacket *prev_pkt,
+                        int *retry_header)
 {
     uint8_t hdr;
 
-    if (ffurl_read(h, &hdr, 1) != 1)
+    if (*retry_header >= 0) {
+        hdr = *retry_header & 0xFF;
+        *retry_header = -1;
+    } else if (ffurl_read(h, &hdr, 1) != 1)
         return AVERROR(EIO);
 
-    return ff_rtmp_packet_read_internal(h, p, chunk_size, prev_pkt, hdr);
+    return ff_rtmp_packet_read_internal(h, p, chunk_size, prev_pkt, hdr, 
retry_header);
 }
 
 int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size,
-                                 RTMPPacket *prev_pkt, uint8_t hdr)
+                                 RTMPPacket *prev_pkt, uint8_t hdr,
+                                 int* retry_header)
 {
 
     uint8_t t, buf[16];
-    int channel_id, timestamp, size, offset = 0;
+    int channel_id, timestamp, size;
     uint32_t extra = 0;
     enum RTMPPacketType type;
     int written = 0;
     int ret;
 
-    written++;
     channel_id = hdr & 0x3F;
 
     if (channel_id < 2) { //special case for channel number >= 64
@@ -198,9 +202,26 @@ int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket 
*p, int chunk_size,
     if (hdr != RTMP_PS_TWELVEBYTES)
         timestamp += prev_pkt[channel_id].timestamp;
 
+    if (!prev_pkt[channel_id].read) {
     if ((ret = ff_rtmp_packet_create(p, channel_id, type, timestamp,
                                      size)) < 0)
         return ret;
+        // include 1-byte header from ff_rtmp_packet_read
+        p->read = written + 1;
+        p->offset = 0;
+    } else {
+        // previous packet in this channel hasn't completed reading
+        RTMPPacket *prev = &prev_pkt[channel_id];
+        p->data          = prev->data;
+        p->size          = prev->size;
+        p->channel_id    = prev->channel_id;
+        p->type          = prev->type;
+        p->ts_delta      = prev->ts_delta;
+        p->extra         = prev->extra;
+        p->offset        = prev->offset;
+        p->read          = prev->read + written;
+        prev->data       = NULL;
+    }
     p->extra = extra;
     // save history
     prev_pkt[channel_id].channel_id = channel_id;
@@ -209,26 +230,34 @@ int ff_rtmp_packet_read_internal(URLContext *h, 
RTMPPacket *p, int chunk_size,
     prev_pkt[channel_id].ts_delta   = timestamp - 
prev_pkt[channel_id].timestamp;
     prev_pkt[channel_id].timestamp  = timestamp;
     prev_pkt[channel_id].extra      = extra;
+    size = size - p->offset;
     while (size > 0) {
         int toread = FFMIN(size, chunk_size);
-        if (ffurl_read_complete(h, p->data + offset, toread) != toread) {
+        if (ffurl_read_complete(h, p->data + p->offset, toread) != toread) {
             ff_rtmp_packet_destroy(p);
             return AVERROR(EIO);
         }
         size    -= chunk_size;
-        offset  += chunk_size;
-        written += chunk_size;
+        p->read   += toread;
+        p->offset += toread;
         if (size > 0) {
             if ((ret = ffurl_read_complete(h, &t, 1)) < 0) { // marker
                 ff_rtmp_packet_destroy(p);
                 return ret;
             }
-            written++;
-            if (t != (0xC0 + channel_id))
-                return -1;
+            p->read++;
+            if (t != (0xC0 + channel_id)) {
+                RTMPPacket *prev = &prev_pkt[channel_id];
+                *retry_header = t;
+                prev->data = p->data;
+                prev->read = p->read;
+                prev->offset = p->offset;
+                return 0; // should lead to AVERROR(EAGAIN)
+            }
         }
     }
-    return written;
+    prev_pkt[channel_id].read = 0; // read complete; reset if needed
+    return p->read;
 }
 
 int ff_rtmp_packet_write(URLContext *h, RTMPPacket *pkt,
diff --git a/libavformat/rtmppkt.h b/libavformat/rtmppkt.h
index e3120be..768a22e 100644
--- a/libavformat/rtmppkt.h
+++ b/libavformat/rtmppkt.h
@@ -82,6 +82,8 @@ typedef struct RTMPPacket {
     uint32_t       extra;      ///< probably an additional channel ID used 
during streaming data
     uint8_t        *data;      ///< packet payload
     int            size;       ///< packet payload size
+    int            offset;     ///< amount of data read so far
+    int            read;       ///< amount read, including headers
 } RTMPPacket;
 
 /**
@@ -112,10 +114,15 @@ void ff_rtmp_packet_destroy(RTMPPacket *pkt);
  * @param chunk_size current chunk size
  * @param prev_pkt   previously read packet headers for all channels
  *                   (may be needed for restoring incomplete packet header)
+ * @param retry_header indicates a previously read header byte to use
+                       for restoring an incomplete packet. On return,
+                       can indicate the byte to cache for the next
+                       iteration if needed. Values <0 are ignored.
  * @return number of bytes read on success, negative value otherwise
  */
 int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p,
-                        int chunk_size, RTMPPacket *prev_pkt);
+                        int chunk_size, RTMPPacket *prev_pkt,
+                        int *retry_header);
 /**
  * Read internal RTMP packet sent by the server.
  *
@@ -125,10 +132,13 @@ int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p,
  * @param prev_pkt   previously read packet headers for all channels
  *                   (may be needed for restoring incomplete packet header)
  * @param c          the first byte already read
+ * @param retry_header Output parameter that indicates the first
+                       header byte to cache if needed. <0 if unneeded.
  * @return number of bytes read on success, negative value otherwise
  */
 int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size,
-                                 RTMPPacket *prev_pkt, uint8_t c);
+                                 RTMPPacket *prev_pkt, uint8_t c,
+                                 int *retry_header);
 
 /**
  * Send RTMP packet to the server.
diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c
index dc14e56..1e769d7 100644
--- a/libavformat/rtmpproto.c
+++ b/libavformat/rtmpproto.c
@@ -122,6 +122,7 @@ typedef struct RTMPContext {
     char          auth_params[500];
     int           do_reconnect;
     int           auth_tried;
+    int           retry_header;
 } RTMPContext;
 
 #define PLAYER_KEY_OPEN_PART_LEN 30   ///< length of partial key used for 
first client digest signing
@@ -405,7 +406,7 @@ static int read_connect(URLContext *s, RTMPContext *rt)
     GetByteContext gbc;
 
     if ((ret = ff_rtmp_packet_read(rt->stream, &pkt, rt->in_chunk_size,
-                                   rt->prev_pkt[1])) < 0)
+                                   rt->prev_pkt[1], &rt->retry_header)) < 0)
         return ret;
     cp = pkt.data;
     bytestream2_init(&gbc, cp, pkt.size);
@@ -2162,7 +2163,7 @@ static int get_packet(URLContext *s, int for_header)
     for (;;) {
         RTMPPacket rpkt = { 0 };
         if ((ret = ff_rtmp_packet_read(rt->stream, &rpkt,
-                                       rt->in_chunk_size, rt->prev_pkt[0])) <= 
0) {
+                                       rt->in_chunk_size, rt->prev_pkt[0], 
&rt->retry_header)) <= 0) {
             if (ret == 0) {
                 return AVERROR(EAGAIN);
             } else {
@@ -2311,6 +2312,7 @@ static int rtmp_open(URLContext *s, const char *uri, int 
flags)
         rt->listen = 1;
 
     rt->is_input = !(flags & AVIO_FLAG_WRITE);
+    rt->retry_header = -1;
 
     av_url_split(proto, sizeof(proto), auth, sizeof(auth),
                  hostname, sizeof(hostname), &port,
@@ -2681,7 +2683,8 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, 
int size)
 
         if ((ret = ff_rtmp_packet_read_internal(rt->stream, &rpkt,
                                                 rt->in_chunk_size,
-                                                rt->prev_pkt[0], c)) <= 0)
+                                                rt->prev_pkt[0], c,
+                                                &rt->retry_header)) <= 0)
              return ret;
 
         if ((ret = rtmp_parse_result(s, rt, &rpkt)) < 0)
-- 
1.7.9.5

_______________________________________________
libav-devel mailing list
[email protected]
https://lists.libav.org/mailman/listinfo/libav-devel

Reply via email to