edrevo commented on a change in pull request #541:
URL: https://github.com/apache/arrow-datafusion/pull/541#discussion_r650183206



##########
File path: ballista/rust/core/src/execution_plans/shuffle_reader.rs
##########
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition: usize,
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
-        let partition_location = &self.partition_location[partition];
-
-        let mut client = BallistaClient::try_new(
-            &partition_location.executor_meta.host,
-            partition_location.executor_meta.port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("Ballista Error: 
{:?}", e)))?;
 
-        client
-            .fetch_partition(
-                &partition_location.partition_id.job_id,
-                partition_location.partition_id.stage_id,
-                partition,
-            )
+        let x = self.partition[partition].clone();
+        let result = future::join_all(x.into_iter().map(fetch_partition))

Review comment:
       nit; if you change `fetch_partition` to take a refernce, you can avoid 
the `.clone`:
   
   ```suggestion
           let partition_locations = &self.partition[partition];
           let result = 
future::join_all(partition_locations.iter().map(fetch_partition))
   ```

##########
File path: ballista/rust/core/src/execution_plans/shuffle_reader.rs
##########
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition: usize,
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
-        let partition_location = &self.partition_location[partition];
-
-        let mut client = BallistaClient::try_new(
-            &partition_location.executor_meta.host,
-            partition_location.executor_meta.port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("Ballista Error: 
{:?}", e)))?;
 
-        client
-            .fetch_partition(
-                &partition_location.partition_id.job_id,
-                partition_location.partition_id.stage_id,
-                partition,
-            )
+        let x = self.partition[partition].clone();
+        let result = future::join_all(x.into_iter().map(fetch_partition))
             .await
-            .map_err(|e| DataFusionError::Execution(format!("Ballista Error: 
{:?}", e)))
+            .into_iter()
+            .collect::<Result<Vec<_>>>()?;
+
+        let result = WrappedStream::new(
+            Box::pin(futures::stream::iter(result).flatten()),

Review comment:
       > I wonder if it will serialize them all (aka not start reading from the 
second until the first is entirely consumed)?
   
   Yes, that's exactly what it does.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to