Title: [192765] trunk
Revision
192765
Author
[email protected]
Date
2015-11-24 10:47:19 -0800 (Tue, 24 Nov 2015)

Log Message

[Streams API] Implement pipeTo method in readable Stream
https://bugs.webkit.org/show_bug.cgi?id=151588

Reviewed by Darin Adler.

Source/WebCore:

Implemented pipeTo method according to the reference implementation in the spec as the spec is not written
yet. It can be found at https://github.com/whatwg/streams/blob/632b26a05f3106650b1ec91239ad5b012e6c64af/reference-implementation/lib/readable-stream.js#L75.

Tests: streams/pipe-to.html
       streams/reference-implementation/brand-checks.html
       streams/reference-implementation/pipe-through.html
       streams/reference-implementation/pipe-to.html
       streams/reference-implementation/pipe-to-options.html
       streams/reference-implementation/readable-stream-templated

* Modules/streams/ReadableStream.js:
(doPipe): Internal function of pipeTo.
(closeDest): Internal function of pipeTo.
(abortDest): Internal function of pipeTo.
(pipeTo): Implemented as per spec with some other internal functions as helpers.

LayoutTests:

Test "Piping to a writable stream that does not consume the writes fast enough exerts backpressure on the
source" was moved to its own file because it causes timing issues.

* streams/reference-implementation/pipe-to.html: Moved "Piping to a writable stream that does not consume the
writes fast enough exerts backpressure on the source" test to its own file.
* streams/pipe-to.html: Added with "Piping to a writable stream that does not consume the writes fast enough
exerts backpressure on the source" test.
* streams/reference-implementation/brand-checks.html: Fixed issue with the creation of a ReadableStreamReader.
* streams/pipe-to-expected.txt:
* streams/reference-implementation/brand-checks-expected.txt:
* streams/reference-implementation/pipe-through-expected.txt:
* streams/reference-implementation/pipe-to-expected.txt:
* streams/reference-implementation/pipe-to-options-expected.txt:
* streams/reference-implementation/readable-stream-templated-expected.txt: Expectations.
* platform/mac/TestExpectations:
* platform/win/TestExpectations: Flagged pipe-to test because of webkit.org/b/147933.

Modified Paths

Added Paths

Diff

Modified: trunk/LayoutTests/ChangeLog (192764 => 192765)


--- trunk/LayoutTests/ChangeLog	2015-11-24 18:22:03 UTC (rev 192764)
+++ trunk/LayoutTests/ChangeLog	2015-11-24 18:47:19 UTC (rev 192765)
@@ -1,3 +1,27 @@
+2015-11-24  Xabier Rodriguez Calvar  <[email protected]>
+
+        [Streams API] Implement pipeTo method in readable Stream
+        https://bugs.webkit.org/show_bug.cgi?id=151588
+
+        Reviewed by Darin Adler.
+
+        Test "Piping to a writable stream that does not consume the writes fast enough exerts backpressure on the
+        source" was moved to its own file because it causes timing issues.
+
+        * streams/reference-implementation/pipe-to.html: Moved "Piping to a writable stream that does not consume the
+        writes fast enough exerts backpressure on the source" test to its own file.
+        * streams/pipe-to.html: Added with "Piping to a writable stream that does not consume the writes fast enough
+        exerts backpressure on the source" test.
+        * streams/reference-implementation/brand-checks.html: Fixed issue with the creation of a ReadableStreamReader.
+        * streams/pipe-to-expected.txt:
+        * streams/reference-implementation/brand-checks-expected.txt:
+        * streams/reference-implementation/pipe-through-expected.txt:
+        * streams/reference-implementation/pipe-to-expected.txt:
+        * streams/reference-implementation/pipe-to-options-expected.txt:
+        * streams/reference-implementation/readable-stream-templated-expected.txt: Expectations.
+        * platform/mac/TestExpectations:
+        * platform/win/TestExpectations: Flagged pipe-to test because of webkit.org/b/147933.
+
 2015-11-24  Antti Koivisto  <[email protected]>
 
         REGRESSION (r190983): Non-element, non-text nodes should not be distributed to slots

Modified: trunk/LayoutTests/platform/mac/TestExpectations (192764 => 192765)


--- trunk/LayoutTests/platform/mac/TestExpectations	2015-11-24 18:22:03 UTC (rev 192764)
+++ trunk/LayoutTests/platform/mac/TestExpectations	2015-11-24 18:47:19 UTC (rev 192765)
@@ -1380,6 +1380,7 @@
 
 # Promises callbacks issues create problems in the writable stream tests
 webkit.org/b/147933 streams/reference-implementation/count-queuing-strategy.html [ Pass Failure ]
+webkit.org/b/147933 streams/reference-implementation/pipe-to.html [ Pass Failure ]
 webkit.org/b/147933 streams/reference-implementation/writable-stream-abort.html [ Pass Failure ]
 
 webkit.org/b/150806 imported/w3c/web-platform-tests/XMLHttpRequest/send-timeout-events.htm [ Pass Failure ]

Modified: trunk/LayoutTests/platform/win/TestExpectations (192764 => 192765)


--- trunk/LayoutTests/platform/win/TestExpectations	2015-11-24 18:22:03 UTC (rev 192764)
+++ trunk/LayoutTests/platform/win/TestExpectations	2015-11-24 18:47:19 UTC (rev 192765)
@@ -3297,8 +3297,9 @@
 
 webkit.org/b/150808 fast/text/multiple-feature-properties.html [ ImageOnlyFailure ]
 
+webkit.org/b/147933 streams/reference-implementation/count-queuing-strategy.html [ Pass Failure ]
+webkit.org/b/147933 streams/reference-implementation/pipe-to.html [ Pass Failure ]
 webkit.org/b/147933 streams/reference-implementation/writable-stream-abort.html [ Pass Failure ]
-webkit.org/b/147933 streams/reference-implementation/count-queuing-strategy.html [ Pass Failure ]
 
 webkit.org/b/150946 [ Debug ] scrollbars/custom-scrollbar-appearance-property.html [ Crash ]
 

Added: trunk/LayoutTests/streams/pipe-to-expected.txt (0 => 192765)


--- trunk/LayoutTests/streams/pipe-to-expected.txt	                        (rev 0)
+++ trunk/LayoutTests/streams/pipe-to-expected.txt	2015-11-24 18:47:19 UTC (rev 192765)
@@ -0,0 +1,3 @@
+
+PASS Piping to a writable stream that does not consume the writes fast enough exerts backpressure on the source 
+

Added: trunk/LayoutTests/streams/pipe-to.html (0 => 192765)


--- trunk/LayoutTests/streams/pipe-to.html	                        (rev 0)
+++ trunk/LayoutTests/streams/pipe-to.html	2015-11-24 18:47:19 UTC (rev 192765)
@@ -0,0 +1,103 @@
+<!DOCTYPE html>
+<script src=''></script>
+<script src=''></script>
+<script src=''></script>
+<script>
+// This is updated till https://github.com/whatwg/streams/commit/ec5ffa036308d9f6350d2946560d48cdbf090939
+
+// This test is alone here for timing reasons though it should be at streams/reference-implementation/pipe-to.html.
+var test24 = async_test('Piping to a writable stream that does not consume the writes fast enough exerts backpressure on the source');
+test24.step(function() {
+    const timeoutMultiplier = 5;
+    var desiredSizes = [];
+    var rs = new ReadableStream({
+        start: function(c) {
+            setTimeout(test24.step_func(function() { enqueue('a'); }), 100 * timeoutMultiplier);
+            setTimeout(test24.step_func(function() { enqueue('b'); }), 200 * timeoutMultiplier);
+            setTimeout(test24.step_func(function() { enqueue('c'); }), 300 * timeoutMultiplier);
+            setTimeout(test24.step_func(function() { enqueue('d'); }), 400 * timeoutMultiplier);
+            setTimeout(test24.step_func(function() { c.close(); }), 500 * timeoutMultiplier);
+
+            function enqueue(chunk) {
+                c.enqueue(chunk);
+                desiredSizes.push(c.desiredSize);
+            }
+        }
+    });
+
+    var chunksGivenToWrite = [];
+    var chunksFinishedWriting = [];
+    var startPromise = Promise.resolve();
+    var ws = new WritableStream({
+        start: function() {
+            return startPromise;
+        },
+        write: function(chunk) {
+            chunksGivenToWrite.push(chunk);
+            return new Promise(test24.step_func(function(resolve) {
+                setTimeout(test24.step_func(function() {
+                    chunksFinishedWriting.push(chunk);
+                    resolve();
+                }), 350 * timeoutMultiplier);
+            }));
+        }
+    });
+
+    startPromise.then(test24.step_func(function() {
+        rs.pipeTo(ws).then(test24.step_func(function() {
+            assert_array_equals(desiredSizes, [1, 1, 0, -1], 'backpressure was correctly exerted at the source');
+            assert_array_equals(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks were written');
+            test24.done();
+        }));
+
+        assert_equals(ws.state, 'writable', 'at t = 0 ms, ws should be writable');
+
+        setTimeout(test24.step_func(function() {
+            assert_equals(ws.state, 'waiting', 'at t = 125 ms, ws should be waiting');
+            assert_array_equals(chunksGivenToWrite, ['a'], 'at t = 125 ms, ws.write should have been called with one chunk');
+            assert_array_equals(chunksFinishedWriting, [], 'at t = 125 ms, no chunks should have finished writing');
+
+            // When 'a' (the very first chunk) was enqueued, it was immediately used to fulfill the outstanding read request
+            // promise, leaving room in the queue
+            assert_array_equals(desiredSizes, [1], 'at t = 125 ms, the one enqueued chunk in rs did not cause backpressure');
+        }), 125 * timeoutMultiplier);
+
+        setTimeout(test24.step_func(function() {
+            assert_equals(ws.state, 'waiting', 'at t = 225 ms, ws should be waiting');
+            assert_array_equals(chunksGivenToWrite, ['a'], 'at t = 225 ms, ws.write should have been called with one chunk');
+            assert_array_equals(chunksFinishedWriting, [], 'at t = 225 ms, no chunks should have finished writing');
+
+            // When 'b' was enqueued at 200 ms, the queue was also empty, since immediately after enqueuing 'a' at
+            // t = 100 ms, it was dequeued in order to fulfill the read() call that was made at time t = 0.
+            assert_array_equals(desiredSizes, [1, 1], 'at t = 225 ms, the two enqueued chunks in rs did not cause backpressure');
+        }), 225 * timeoutMultiplier);
+
+        setTimeout(test24.step_func(function() {
+            assert_equals(ws.state, 'waiting', 'at t = 325 ms, ws should be waiting');
+            assert_array_equals(chunksGivenToWrite, ['a'], 'at t = 325 ms, ws.write should have been called with one chunk');
+            assert_array_equals(chunksFinishedWriting, [], 'at t = 325 ms, no chunks should have finished writing');
+
+            // When 'c' was enqueued at 300 ms, the queue was again empty, since at time t = 200 ms when 'b' was enqueued,
+            // it was immediately dequeued in order to fulfill the second read() call that was made at time t = 0.
+            // However, this time there was no pending read request to whisk it away, so after the enqueue desired size is 0.
+            assert_array_equals(desiredSizes, [1, 1, 0], 'at t = 325 ms, the three enqueued chunks in rs did not cause backpressure');
+        }), 325 * timeoutMultiplier);
+
+        setTimeout(test24.step_func(function() {
+            assert_equals(ws.state, 'waiting', 'at t = 425 ms, ws should be waiting');
+            assert_array_equals(chunksGivenToWrite, ['a'], 'at t = 425 ms, ws.write should have been called with one chunk');
+            assert_array_equals(chunksFinishedWriting, [], 'at t = 425 ms, no chunks should have finished writing');
+
+            // When 'd' was enqueued at 400 ms, the queue was *not* empty. 'c' was still in it, since the write() of 'b' will
+            // not finish until t = 100 ms + 350 ms = 450 ms. Thus backpressure should have been exerted.
+            assert_array_equals(desiredSizes, [1, 1, 0, -1], 'at t = 425 ms, the fourth enqueued chunks in rs did cause backpressure');
+        }), 425 * timeoutMultiplier);
+
+        setTimeout(test24.step_func(function() {
+            assert_equals(ws.state, 'waiting', 'at t = 475 ms, ws should be waiting');
+            assert_array_equals(chunksGivenToWrite, ['a', 'b'], 'at t = 475 ms, ws.write should have been called with two chunks');
+            assert_array_equals(chunksFinishedWriting, ['a'], 'at t = 475 ms, one chunk should have finished writing');
+        }), 475 * timeoutMultiplier);
+    }));
+});
+</script>

Modified: trunk/LayoutTests/streams/reference-implementation/brand-checks-expected.txt (192764 => 192765)


--- trunk/LayoutTests/streams/reference-implementation/brand-checks-expected.txt	2015-11-24 18:22:03 UTC (rev 192764)
+++ trunk/LayoutTests/streams/reference-implementation/brand-checks-expected.txt	2015-11-24 18:47:19 UTC (rev 192765)
@@ -4,7 +4,7 @@
 PASS ReadableStream.prototype.cancel enforces a brand check 
 PASS ReadableStream.prototype.getReader enforces a brand check 
 PASS ReadableStream.prototype.pipeThrough works generically on its this and its arguments 
-FAIL ReadableStream.prototype.pipeTo works generically on its this and its arguments pipeTo is not implemented
+PASS ReadableStream.prototype.pipeTo works generically on its this and its arguments 
 PASS ReadableStream.prototype.tee enforces a brand check 
 PASS ReadableStreamReader enforces a brand check on its argument 
 PASS ReadableStreamReader.prototype.closed enforces a brand check 

Modified: trunk/LayoutTests/streams/reference-implementation/brand-checks.html (192764 => 192765)


--- trunk/LayoutTests/streams/reference-implementation/brand-checks.html	2015-11-24 18:22:03 UTC (rev 192764)
+++ trunk/LayoutTests/streams/reference-implementation/brand-checks.html	2015-11-24 18:47:19 UTC (rev 192765)
@@ -25,7 +25,7 @@
 function fakeReadableStream() {
     return {
         cancel: function(reason) { return Promise.resolve(); },
-        getReader: function() { return new ReadableStream(new ReadableStream()); },
+        getReader: function() { return new ReadableStreamReader(new ReadableStream()); },
         pipeThrough: function(obj, options) { return obj.readable; },
         pipeTo: function() { return Promise.resolve(); },
         tee: function() { return [realReadableStream(), realReadableStream()]; }

Modified: trunk/LayoutTests/streams/reference-implementation/pipe-through-expected.txt (192764 => 192765)


--- trunk/LayoutTests/streams/reference-implementation/pipe-through-expected.txt	2015-11-24 18:22:03 UTC (rev 192764)
+++ trunk/LayoutTests/streams/reference-implementation/pipe-through-expected.txt	2015-11-24 18:47:19 UTC (rev 192765)
@@ -1,4 +1,4 @@
 
-FAIL Piping through a duck-typed pass-through transform stream works pipeTo is not implemented
+PASS Piping through a duck-typed pass-through transform stream works 
 FAIL Piping through an identity transform stream will close the destination when the source closes Can't find variable: TransformStream
 

Modified: trunk/LayoutTests/streams/reference-implementation/pipe-to-expected.txt (192764 => 192765)


--- trunk/LayoutTests/streams/reference-implementation/pipe-to-expected.txt	2015-11-24 18:22:03 UTC (rev 192764)
+++ trunk/LayoutTests/streams/reference-implementation/pipe-to-expected.txt	2015-11-24 18:47:19 UTC (rev 192765)
@@ -1,26 +1,25 @@
 
-FAIL Piping from a ReadableStream from which lots of data are readable synchronously pipeTo is not implemented
-FAIL Piping from a ReadableStream in readable state to a WritableStream in closing state pipeTo is not implemented
-FAIL Piping from a ReadableStream in readable state to a WritableStream in errored state pipeTo is not implemented
-FAIL Piping from a ReadableStream in the readable state which becomes closed after pipeTo call to a WritableStream in the writable state pipeTo is not implemented
-FAIL Piping from a ReadableStream in the readable state which becomes errored after pipeTo call to a WritableStream in the writable state pipeTo is not implemented
-FAIL Piping from an empty ReadableStream which becomes non-empty after pipeTo call to a WritableStream in the writable state pipeTo is not implemented
-FAIL Piping from an empty ReadableStream which becomes errored after pipeTo call to a WritableStream in the writable state pipeTo is not implemented
-FAIL Piping from an empty ReadableStream to a WritableStream in the writable state which becomes errored after a pipeTo call pipeTo is not implemented
-FAIL Piping from a non-empty ReadableStream to a WritableStream in the waiting state which becomes writable after a pipeTo call pipeTo is not implemented
-FAIL Piping from a non-empty ReadableStream to a WritableStream in waiting state which becomes errored after a pipeTo call pipeTo is not implemented
-FAIL Piping from a non-empty ReadableStream which becomes errored after pipeTo call to a WritableStream in the waiting state pipeTo is not implemented
-FAIL Piping from a non-empty ReadableStream to a WritableStream in the waiting state where both become ready after a pipeTo pipeTo is not implemented
-FAIL Piping from an empty ReadableStream to a WritableStream in the waiting state which becomes writable after a pipeTo call pipeTo is not implemented
-FAIL Piping from an empty ReadableStream which becomes closed after a pipeTo call to a WritableStream in the waiting state whose writes never complete pipeTo is not implemented
-FAIL Piping from an empty ReadableStream which becomes errored after a pipeTo call to a WritableStream in the waiting state pipeTo is not implemented
-FAIL Piping to a duck-typed asynchronous "writable stream" works pipeTo is not implemented
-FAIL Piping to a stream that has been aborted passes through the error as the cancellation reason pipeTo is not implemented
-FAIL Piping to a stream and then aborting it passes through the error as the cancellation reason pipeTo is not implemented
-FAIL Piping to a stream that has been closed propagates a TypeError cancellation reason backward pipeTo is not implemented
-FAIL Piping to a stream and then closing it propagates a TypeError cancellation reason backward pipeTo is not implemented
-FAIL Piping to a stream that errors on write should pass through the error as the cancellation reason pipeTo is not implemented
-FAIL Piping to a stream that errors on write should not pass through the error if the stream is already closed pipeTo is not implemented
-FAIL Piping to a stream that errors soon after writing should pass through the error as the cancellation reason pipeTo is not implemented
-FAIL Piping to a writable stream that does not consume the writes fast enough exerts backpressure on the source pipeTo is not implemented
+PASS Piping from a ReadableStream from which lots of data are readable synchronously 
+PASS Piping from a ReadableStream in readable state to a WritableStream in closing state 
+PASS Piping from a ReadableStream in readable state to a WritableStream in errored state 
+PASS Piping from a ReadableStream in the readable state which becomes closed after pipeTo call to a WritableStream in the writable state 
+PASS Piping from a ReadableStream in the readable state which becomes errored after pipeTo call to a WritableStream in the writable state 
+PASS Piping from an empty ReadableStream which becomes non-empty after pipeTo call to a WritableStream in the writable state 
+PASS Piping from an empty ReadableStream which becomes errored after pipeTo call to a WritableStream in the writable state 
+PASS Piping from an empty ReadableStream to a WritableStream in the writable state which becomes errored after a pipeTo call 
+PASS Piping from a non-empty ReadableStream to a WritableStream in the waiting state which becomes writable after a pipeTo call 
+PASS Piping from a non-empty ReadableStream to a WritableStream in waiting state which becomes errored after a pipeTo call 
+PASS Piping from a non-empty ReadableStream which becomes errored after pipeTo call to a WritableStream in the waiting state 
+PASS Piping from a non-empty ReadableStream to a WritableStream in the waiting state where both become ready after a pipeTo 
+PASS Piping from an empty ReadableStream to a WritableStream in the waiting state which becomes writable after a pipeTo call 
+PASS Piping from an empty ReadableStream which becomes closed after a pipeTo call to a WritableStream in the waiting state whose writes never complete 
+PASS Piping from an empty ReadableStream which becomes errored after a pipeTo call to a WritableStream in the waiting state 
+PASS Piping to a duck-typed asynchronous "writable stream" works 
+PASS Piping to a stream that has been aborted passes through the error as the cancellation reason 
+PASS Piping to a stream and then aborting it passes through the error as the cancellation reason 
+PASS Piping to a stream that has been closed propagates a TypeError cancellation reason backward 
+PASS Piping to a stream and then closing it propagates a TypeError cancellation reason backward 
+PASS Piping to a stream that errors on write should pass through the error as the cancellation reason 
+PASS Piping to a stream that errors on write should not pass through the error if the stream is already closed 
+PASS Piping to a stream that errors soon after writing should pass through the error as the cancellation reason 
 

Modified: trunk/LayoutTests/streams/reference-implementation/pipe-to-options-expected.txt (192764 => 192765)


--- trunk/LayoutTests/streams/reference-implementation/pipe-to-options-expected.txt	2015-11-24 18:22:03 UTC (rev 192764)
+++ trunk/LayoutTests/streams/reference-implementation/pipe-to-options-expected.txt	2015-11-24 18:47:19 UTC (rev 192765)
@@ -1,5 +1,5 @@
 
-FAIL Piping with no options and a destination error pipeTo is not implemented
-FAIL Piping with { preventCancel: false } and a destination error pipeTo is not implemented
-FAIL Piping with { preventCancel: true } and a destination error pipeTo is not implemented
+PASS Piping with no options and a destination error 
+PASS Piping with { preventCancel: false } and a destination error 
+PASS Piping with { preventCancel: true } and a destination error 
 

Modified: trunk/LayoutTests/streams/reference-implementation/pipe-to.html (192764 => 192765)


--- trunk/LayoutTests/streams/reference-implementation/pipe-to.html	2015-11-24 18:22:03 UTC (rev 192764)
+++ trunk/LayoutTests/streams/reference-implementation/pipe-to.html	2015-11-24 18:47:19 UTC (rev 192765)
@@ -960,97 +960,6 @@
     rs.pipeTo(ws);
 });
 
-var test24 = async_test('Piping to a writable stream that does not consume the writes fast enough exerts backpressure on the source');
-test24.step(function() {
-    var desiredSizes = [];
-    var rs = new ReadableStream({
-        start: function(c) {
-            setTimeout(test24.step_func(function() { enqueue('a'); }), 200);
-            setTimeout(test24.step_func(function() { enqueue('b'); }), 400);
-            setTimeout(test24.step_func(function() { enqueue('c'); }), 600);
-            setTimeout(test24.step_func(function() { enqueue('d'); }), 800);
-            setTimeout(test24.step_func(function() { c.close(); }), 1000);
-
-            function enqueue(chunk) {
-                c.enqueue(chunk);
-                desiredSizes.push(c.desiredSize);
-            }
-        }
-    });
-
-    var chunksGivenToWrite = [];
-    var chunksFinishedWriting = [];
-    var startPromise = Promise.resolve();
-    var ws = new WritableStream({
-        start: function() {
-            return startPromise;
-        },
-        write: function(chunk) {
-            chunksGivenToWrite.push(chunk);
-            return new Promise(test24.step_func(function(resolve) {
-                setTimeout(test24.step_func(function() {
-                    chunksFinishedWriting.push(chunk);
-                    resolve();
-                }), 700);
-            }));
-        }
-    });
-
-    startPromise.then(test24.step_func(function() {
-        rs.pipeTo(ws).then(test24.step_func(function() {
-            assert_array_equals(desiredSizes, [1, 1, 0, -1], 'backpressure was correctly exerted at the source');
-            assert_array_equals(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks were written');
-            test24.done();
-        }));
-
-        assert_equals(ws.state, 'writable', 'at t = 0 ms, ws should be writable');
-
-        setTimeout(test24.step_func(function() {
-            assert_equals(ws.state, 'waiting', 'at t = 250 ms, ws should be waiting');
-            assert_array_equals(chunksGivenToWrite, ['a'], 'at t = 250 ms, ws.write should have been called with one chunk');
-            assert_array_equals(chunksFinishedWriting, [], 'at t = 250 ms, no chunks should have finished writing');
-
-            // When 'a' (the very first chunk) was enqueued, it was immediately used to fulfill the outstanding read request
-            // promise, leaving room in the queue
-            assert_array_equals(desiredSizes, [1], 'at t = 250 ms, the one enqueued chunk in rs did not cause backpressure');
-        }), 250);
-
-        setTimeout(test24.step_func(function() {
-            assert_equals(ws.state, 'waiting', 'at t = 450 ms, ws should be waiting');
-            assert_array_equals(chunksGivenToWrite, ['a'], 'at t = 450 ms, ws.write should have been called with one chunk');
-            assert_array_equals(chunksFinishedWriting, [], 'at t = 450 ms, no chunks should have finished writing');
-
-            // When 'b' was enqueued at 200 ms, the queue was also empty, since immediately after enqueuing 'a' at
-            // t = 100 ms, it was dequeued in order to fulfill the read() call that was made at time t = 0.
-            assert_array_equals(desiredSizes, [1, 1], 'at t = 450 ms, the two enqueued chunks in rs did not cause backpressure');
-        }), 450);
-
-        setTimeout(test24.step_func(function() {
-            assert_equals(ws.state, 'waiting', 'at t = 650 ms, ws should be waiting');
-            assert_array_equals(chunksGivenToWrite, ['a'], 'at t = 650 ms, ws.write should have been called with one chunk');
-            assert_array_equals(chunksFinishedWriting, [], 'at t = 650 ms, no chunks should have finished writing');
-
-            // When 'c' was enqueued at 300 ms, the queue was again empty, since at time t = 200 ms when 'b' was enqueued,
-            // it was immediately dequeued in order to fulfill the second read() call that was made at time t = 0.
-            // However, this time there was no pending read request to whisk it away, so after the enqueue desired size is 0.
-            assert_array_equals(desiredSizes, [1, 1, 0], 'at t = 650 ms, the three enqueued chunks in rs did not cause backpressure');
-        }), 650);
-
-        setTimeout(test24.step_func(function() {
-            assert_equals(ws.state, 'waiting', 'at t = 850 ms, ws should be waiting');
-            assert_array_equals(chunksGivenToWrite, ['a'], 'at t = 850 ms, ws.write should have been called with one chunk');
-            assert_array_equals(chunksFinishedWriting, [], 'at t = 850 ms, no chunks should have finished writing');
-
-            // When 'd' was enqueued at 400 ms, the queue was *not* empty. 'c' was still in it, since the write() of 'b' will
-            // not finish until t = 100 ms + 350 ms = 450 ms. Thus backpressure should have been exerted.
-            assert_array_equals(desiredSizes, [1, 1, 0, -1], 'at t = 850 ms, the fourth enqueued chunks in rs did cause backpressure');
-        }), 850);
-
-        setTimeout(test24.step_func(function() {
-            assert_equals(ws.state, 'waiting', 'at t = 950 ms, ws should be waiting');
-            assert_array_equals(chunksGivenToWrite, ['a', 'b'], 'at t = 950 ms, ws.write should have been called with two chunks');
-            assert_array_equals(chunksFinishedWriting, ['a'], 'at t = 950 ms, one chunk should have finished writing');
-        }), 950);
-    }));
-});
+// This test is alone under streams/pipe-to.html for timing reasons.
+// var test24 = async_test('Piping to a writable stream that does not consume the writes fast enough exerts backpressure on the source');
 </script>

Modified: trunk/LayoutTests/streams/reference-implementation/readable-stream-templated-expected.txt (192764 => 192765)


--- trunk/LayoutTests/streams/reference-implementation/readable-stream-templated-expected.txt	2015-11-24 18:22:03 UTC (rev 192764)
+++ trunk/LayoutTests/streams/reference-implementation/readable-stream-templated-expected.txt	2015-11-24 18:47:19 UTC (rev 192765)
@@ -18,8 +18,8 @@
 PASS cancel() should return a distinct fulfilled promise each time 
 PASS locked should be false 
 PASS getReader() should be OK 
-FAIL piping to a WritableStream in the writable state should close the writable stream pipeTo is not implemented
-FAIL piping to a WritableStream in the writable state with { preventClose: true } should do nothing pipeTo is not implemented
+PASS piping to a WritableStream in the writable state should close the writable stream 
+PASS piping to a WritableStream in the writable state with { preventClose: true } should do nothing 
 PASS should be able to acquire multiple readers if they are released in succession 
 PASS should not be able to acquire a second reader if we don't release the first one 
 PASS Running templatedRSClosedReader with ReadableStream reader (closed before getting reader) 
@@ -40,8 +40,8 @@
 PASS cancel() should return a distinct fulfilled promise each time 
 PASS locked should be false 
 PASS getReader() should be OK 
-FAIL piping to a WritableStream in the writable state should close the writable stream pipeTo is not implemented
-FAIL piping to a WritableStream in the writable state with { preventClose: true } should do nothing pipeTo is not implemented
+PASS piping to a WritableStream in the writable state should close the writable stream 
+PASS piping to a WritableStream in the writable state with { preventClose: true } should do nothing 
 PASS should be able to acquire multiple readers if they are released in succession 
 PASS should not be able to acquire a second reader if we don't release the first one 
 PASS Running templatedRSClosedReader with ReadableStream reader (closed via cancel after getting reader) 
@@ -52,7 +52,7 @@
 PASS releasing the lock should cause closed to reject and change identity 
 PASS cancel() should return a distinct fulfilled promise each time 
 PASS Running templatedRSErrored with ReadableStream (errored via call in start) 
-FAIL piping to a WritableStream in the writable state should abort the writable stream pipeTo is not implemented
+PASS piping to a WritableStream in the writable state should abort the writable stream 
 PASS getReader() should return a reader that acts errored 
 PASS read() twice should give the error each time 
 PASS locked should be false 
@@ -62,14 +62,14 @@
 PASS cancel() should return a distinct rejected promise each time 
 PASS reader cancel() should return a distinct rejected promise each time 
 PASS Running templatedRSErrored with ReadableStream (errored via returning a rejected promise in start) 
-FAIL piping to a WritableStream in the writable state should abort the writable stream pipeTo is not implemented
+PASS piping to a WritableStream in the writable state should abort the writable stream 
 PASS getReader() should return a reader that acts errored 
 PASS read() twice should give the error each time 
 PASS locked should be false 
 PASS Running templatedRSErroredAsyncOnly with ReadableStream (errored via returning a rejected promise in start) reader 
-FAIL piping with no options pipeTo is not implemented
-FAIL piping with { preventAbort: false } pipeTo is not implemented
-FAIL piping with { preventAbort: true } pipeTo is not implemented
+PASS piping with no options 
+PASS piping with { preventAbort: false } 
+PASS piping with { preventAbort: true } 
 PASS Running templatedRSErroredReader with ReadableStream (errored via returning a rejected promise in start) reader 
 PASS closed should reject with the error 
 PASS releasing the lock should cause closed to reject and change identity 
@@ -88,26 +88,26 @@
 PASS read() should return distinct promises each time 
 PASS cancel() after a read() should still give that single read result 
 PASS Running templatedRSTwoChunksClosed with ReadableStream (two chunks enqueued, then closed) 
-FAIL piping with no options and no destination errors pipeTo is not implemented
-FAIL piping with { preventClose: false } and no destination errors pipeTo is not implemented
-FAIL piping with { preventClose: true } and no destination errors pipeTo is not implemented
-FAIL piping with { preventClose: false } and a destination with that errors synchronously pipeTo is not implemented
-FAIL piping with { preventClose: true } and a destination with that errors synchronously pipeTo is not implemented
-FAIL piping with { preventClose: true } and a destination that errors on the last chunk pipeTo is not implemented
+PASS piping with no options and no destination errors 
+PASS piping with { preventClose: false } and no destination errors 
+PASS piping with { preventClose: true } and no destination errors 
+PASS piping with { preventClose: false } and a destination with that errors synchronously 
+PASS piping with { preventClose: true } and a destination with that errors synchronously 
+PASS piping with { preventClose: true } and a destination that errors on the last chunk 
 PASS Running templatedRSTwoChunksClosed with ReadableStream (two chunks enqueued async, then closed) 
-FAIL piping with no options and no destination errors pipeTo is not implemented
-FAIL piping with { preventClose: false } and no destination errors pipeTo is not implemented
-FAIL piping with { preventClose: true } and no destination errors pipeTo is not implemented
-FAIL piping with { preventClose: false } and a destination with that errors synchronously pipeTo is not implemented
-FAIL piping with { preventClose: true } and a destination with that errors synchronously pipeTo is not implemented
-FAIL piping with { preventClose: true } and a destination that errors on the last chunk pipeTo is not implemented
+PASS piping with no options and no destination errors 
+PASS piping with { preventClose: false } and no destination errors 
+PASS piping with { preventClose: true } and no destination errors 
+PASS piping with { preventClose: false } and a destination with that errors synchronously 
+PASS piping with { preventClose: true } and a destination with that errors synchronously 
+PASS piping with { preventClose: true } and a destination that errors on the last chunk 
 PASS Running templatedRSTwoChunksClosed with ReadableStream (two chunks enqueued via pull, then closed) 
-FAIL piping with no options and no destination errors pipeTo is not implemented
-FAIL piping with { preventClose: false } and no destination errors pipeTo is not implemented
-FAIL piping with { preventClose: true } and no destination errors pipeTo is not implemented
-FAIL piping with { preventClose: false } and a destination with that errors synchronously pipeTo is not implemented
-FAIL piping with { preventClose: true } and a destination with that errors synchronously pipeTo is not implemented
-FAIL piping with { preventClose: true } and a destination that errors on the last chunk pipeTo is not implemented
+PASS piping with no options and no destination errors 
+PASS piping with { preventClose: false } and no destination errors 
+PASS piping with { preventClose: true } and no destination errors 
+PASS piping with { preventClose: false } and a destination with that errors synchronously 
+PASS piping with { preventClose: true } and a destination with that errors synchronously 
+PASS piping with { preventClose: true } and a destination that errors on the last chunk 
 PASS Running templatedRSTwoChunksClosedReader with ReadableStream (two chunks enqueued, then closed) reader 
 PASS third read(), without waiting, should give { value: undefined, done: true } 
 PASS third read, with waiting, should give { value: undefined, done: true } 

Modified: trunk/Source/WebCore/ChangeLog (192764 => 192765)


--- trunk/Source/WebCore/ChangeLog	2015-11-24 18:22:03 UTC (rev 192764)
+++ trunk/Source/WebCore/ChangeLog	2015-11-24 18:47:19 UTC (rev 192765)
@@ -1,3 +1,26 @@
+2015-11-24  Xabier Rodriguez Calvar  <[email protected]>
+
+        [Streams API] Implement pipeTo method in readable Stream
+        https://bugs.webkit.org/show_bug.cgi?id=151588
+
+        Reviewed by Darin Adler.
+
+        Implemented pipeTo method according to the reference implementation in the spec as the spec is not written
+        yet. It can be found at https://github.com/whatwg/streams/blob/632b26a05f3106650b1ec91239ad5b012e6c64af/reference-implementation/lib/readable-stream.js#L75.
+
+        Tests: streams/pipe-to.html
+               streams/reference-implementation/brand-checks.html
+               streams/reference-implementation/pipe-through.html
+               streams/reference-implementation/pipe-to.html
+               streams/reference-implementation/pipe-to-options.html
+               streams/reference-implementation/readable-stream-templated
+
+        * Modules/streams/ReadableStream.js:
+        (doPipe): Internal function of pipeTo.
+        (closeDest): Internal function of pipeTo.
+        (abortDest): Internal function of pipeTo.
+        (pipeTo): Implemented as per spec with some other internal functions as helpers.
+
 2015-11-24  Antti Koivisto  <[email protected]>
 
         REGRESSION (r190983): Non-element, non-text nodes should not be distributed to slots

Modified: trunk/Source/WebCore/Modules/streams/ReadableStream.js (192764 => 192765)


--- trunk/Source/WebCore/Modules/streams/ReadableStream.js	2015-11-24 18:22:03 UTC (rev 192764)
+++ trunk/Source/WebCore/Modules/streams/ReadableStream.js	2015-11-24 18:47:19 UTC (rev 192765)
@@ -99,11 +99,90 @@
     return streams.readable;
 }
 
-function pipeTo(dest)
+function pipeTo(destination, options)
 {
     "use strict";
 
-    throw new @TypeError("pipeTo is not implemented");
+    // We are not shielding against methods and attributes of the reader and destination as those objects don't have to
+    // be necessarily ReadableStreamReader and WritableStream.
+
+    const preventClose = @isObject(options) && !!options.preventClose;
+    const preventAbort = @isObject(options) && !!options.preventAbort;
+    const preventCancel = @isObject(options) && !!options.preventCancel;
+
+    const source = this;
+
+    let reader;
+    let lastRead;
+    let lastWrite;
+    let closedPurposefully = false;
+    let promiseCapability;
+
+    function doPipe() {
+        lastRead = reader.read();
+        @Promise.prototype.@then.@call(@Promise.all([lastRead, destination.ready]), function([{ value, done }]) {
+            if (done)
+                closeDestination();
+            else if (destination.state === "writable") {
+                lastWrite = destination.write(value);
+                doPipe();
+            }
+        }, function(e) {
+            throw e;
+        });
+    }
+
+    function cancelSource(reason) {
+        if (!preventCancel) {
+            reader.cancel(reason);
+            reader.releaseLock();
+            promiseCapability.@reject.@call(undefined, reason);
+        } else {
+            @Promise.prototype.@then.@call(lastRead, function() {
+                reader.releaseLock();
+                promiseCapability.@reject.@call(undefined, reason);
+            });
+        }
+    }
+
+    function closeDestination() {
+        reader.releaseLock();
+
+        const destinationState = destination.state;
+        if (!preventClose && (destinationState === "waiting" || destinationState === "writable")) {
+            closedPurposefully = true;
+            @Promise.prototype.@then.@call(destination.close(), promiseCapability.@resolve, promiseCapability.@reject);
+        } else if (lastWrite !== undefined)
+            @Promise.prototype.@then.@call(lastWrite, promiseCapability.@resolve, promiseCapability.@reject);
+        else
+            promiseCapability.@resolve.@call();
+
+    }
+
+    function abortDestination(reason) {
+        reader.releaseLock();
+
+        if (!preventAbort)
+            destination.abort(reason);
+        promiseCapability.@reject.@call(undefined, reason);
+    }
+
+    promiseCapability = @newPromiseCapability(@Promise);
+
+    reader = source.getReader();
+
+    @Promise.prototype.@catch.@call(reader.closed, abortDestination);
+    @Promise.prototype.@then.@call(destination.closed,
+        function() {
+            if (!closedPurposefully)
+                cancelSource(new @TypeError('destination is closing or closed and cannot be piped to anymore'));
+        },
+        cancelSource
+    );
+
+    doPipe();
+    
+    return promiseCapability.@promise;
 }
 
 function tee()
_______________________________________________
webkit-changes mailing list
[email protected]
https://lists.webkit.org/mailman/listinfo/webkit-changes

Reply via email to