[GitHub] drill issue #854: DRILL-5589: JDBC client crashes after successful authentic...

2017-06-15 Thread sohami
Github user sohami commented on the issue:

https://github.com/apache/drill/pull/854
  
Thanks! . I added your name in JIRA as reviewer.


Thanks,
Sorabh


From: Karthikeyan Manivannan 
Sent: Thursday, June 15, 2017 7:44:31 PM
To: apache/drill
Cc: Sorabh Hamirwasia; Author
Subject: Re: [apache/drill] DRILL-5589: JDBC client crashes after 
successful authentication if tr… (#854)


@bitblender commented on this pull request.

LGTM +1

—
You are receiving this because you authored the thread.
Reply to this email directly, view it on 
GitHub, or 
mute the 
thread.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-15 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r122324232
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -130,6 +127,7 @@
   private int currentIndex = 0;
   private IterOutcome outcome;
   private int numGroupedRecords = 0;
+  private int currentBatchRecordCount = 0; // Performance: Avoid repeated 
calls to getRecordCount()
--- End diff --

Thanks for the explanation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-15 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r122324618
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch 
incoming) {
 }
   }
 
+  /**
+   *  Read and process (i.e., insert into the hash table and aggregate) 
records from the current batch.
+   *  Once complete, get the incoming NEXT batch and process it as well, 
etc.
+   *  For 1st phase, may return when an early output needs to be performed.
+   *
+   * @return Agg outcome status
+   */
   @Override
   public AggOutcome doWork() {
-try {
-  // Note: Keeping the outer and inner try blocks here to maintain 
some similarity with
-  // StreamingAggregate which does somethings conditionally in the 
outer try block.
-  // In the future HashAggregate may also need to perform some actions 
conditionally
-  // in the outer try block.
-
-  assert ! handlingSpills || currentIndex < Integer.MAX_VALUE;
 
-  outside:
-  while (true) {
+while (true) {
 
-// This would be called only once - after actual data arrives on 
incoming
-if ( schema == null && incoming.getRecordCount() > 0 ) {
-  this.schema = incoming.getSchema();
-  // Calculate the number of partitions based on actual incoming 
data
-  delayedSetup();
-}
+  // This would be called only once - first time actual data arrives 
on incoming
+  if ( schema == null && incoming.getRecordCount() > 0 ) {
+this.schema = incoming.getSchema();
+currentBatchRecordCount = incoming.getRecordCount(); // initialize 
for first non empty batch
+// Calculate the number of partitions based on actual incoming data
+delayedSetup();
+  }
 
-// loop through existing records, aggregating the values as 
necessary.
-if (EXTRA_DEBUG_1) {
-  logger.debug("Starting outer loop of doWork()...");
+  //
+  //  loop through existing records in this batch, aggregating the 
values as necessary.
+  //
+  if (EXTRA_DEBUG_1) {
+logger.debug("Starting outer loop of doWork()...");
+  }
+  for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+if (EXTRA_DEBUG_2) {
+  logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
 }
-for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-  if (EXTRA_DEBUG_2) {
-logger.debug("Doing loop with values underlying {}, current 
{}", underlyingIndex, currentIndex);
-  }
-  checkGroupAndAggrValues(currentIndex);
-  // If adding a group discovered a memory pressure during 1st 
phase, then start
-  // outputing some partition to free memory.
-  if ( earlyOutput ) {
-outputCurrentBatch();
-incIndex(); // next time continue with the next incoming row
-return AggOutcome.RETURN_OUTCOME;
-  }
+checkGroupAndAggrValues(currentIndex);
+// If adding a group discovered a memory pressure during 1st 
phase, then start
+// outputing some partition downstream in order to free memory.
+if ( earlyOutput ) {
+  outputCurrentBatch();
+  incIndex(); // next time continue with the next incoming row
+  return AggOutcome.RETURN_OUTCOME;
 }
+  }
+
+  if (EXTRA_DEBUG_1) {
+logger.debug("Processed {} records", underlyingIndex);
+  }
 
-if (EXTRA_DEBUG_1) {
-  logger.debug("Processed {} records", underlyingIndex);
+  // Cleanup the previous batch since we are done processing it.
+  for (VectorWrapper v : incoming) {
+v.getValueVector().clear();
+  }
+  //
+  // Get the NEXT input batch, initially from the upstream, later (if 
there was a spill)
+  // from one of the spill files (The spill case is handled 
differently here to avoid
+  // collecting stats on the spilled records)
+  //
+  if ( handlingSpills ) {
+outcome = context.shouldContinue() ? incoming.next() : 
IterOutcome.STOP;
+  } else {
+long beforeAlloc = allocator.getAllocatedMemory();
+
+// Get the next RecordBatch from the incoming (i.e. upstream 
operator)
+outcome = outgoing.next(0, incoming);
+
+// If incoming batch is bigger than our estimate - adjust the 
estimate to match
+long afterAlloc = 

[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-15 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r122324389
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch 
incoming) {
 }
   }
 
+  /**
+   *  Read and process (i.e., insert into the hash table and aggregate) 
records from the current batch.
+   *  Once complete, get the incoming NEXT batch and process it as well, 
etc.
+   *  For 1st phase, may return when an early output needs to be performed.
+   *
+   * @return Agg outcome status
+   */
   @Override
   public AggOutcome doWork() {
-try {
-  // Note: Keeping the outer and inner try blocks here to maintain 
some similarity with
-  // StreamingAggregate which does somethings conditionally in the 
outer try block.
-  // In the future HashAggregate may also need to perform some actions 
conditionally
-  // in the outer try block.
-
-  assert ! handlingSpills || currentIndex < Integer.MAX_VALUE;
 
-  outside:
-  while (true) {
+while (true) {
 
-// This would be called only once - after actual data arrives on 
incoming
-if ( schema == null && incoming.getRecordCount() > 0 ) {
-  this.schema = incoming.getSchema();
-  // Calculate the number of partitions based on actual incoming 
data
-  delayedSetup();
-}
+  // This would be called only once - first time actual data arrives 
on incoming
+  if ( schema == null && incoming.getRecordCount() > 0 ) {
+this.schema = incoming.getSchema();
+currentBatchRecordCount = incoming.getRecordCount(); // initialize 
for first non empty batch
+// Calculate the number of partitions based on actual incoming data
+delayedSetup();
+  }
 
-// loop through existing records, aggregating the values as 
necessary.
-if (EXTRA_DEBUG_1) {
-  logger.debug("Starting outer loop of doWork()...");
+  //
+  //  loop through existing records in this batch, aggregating the 
values as necessary.
+  //
+  if (EXTRA_DEBUG_1) {
+logger.debug("Starting outer loop of doWork()...");
+  }
+  for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+if (EXTRA_DEBUG_2) {
+  logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
 }
-for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-  if (EXTRA_DEBUG_2) {
-logger.debug("Doing loop with values underlying {}, current 
{}", underlyingIndex, currentIndex);
-  }
-  checkGroupAndAggrValues(currentIndex);
-  // If adding a group discovered a memory pressure during 1st 
phase, then start
-  // outputing some partition to free memory.
-  if ( earlyOutput ) {
-outputCurrentBatch();
-incIndex(); // next time continue with the next incoming row
-return AggOutcome.RETURN_OUTCOME;
-  }
+checkGroupAndAggrValues(currentIndex);
+// If adding a group discovered a memory pressure during 1st 
phase, then start
+// outputing some partition downstream in order to free memory.
+if ( earlyOutput ) {
+  outputCurrentBatch();
+  incIndex(); // next time continue with the next incoming row
+  return AggOutcome.RETURN_OUTCOME;
 }
+  }
+
+  if (EXTRA_DEBUG_1) {
+logger.debug("Processed {} records", underlyingIndex);
+  }
 
-if (EXTRA_DEBUG_1) {
-  logger.debug("Processed {} records", underlyingIndex);
+  // Cleanup the previous batch since we are done processing it.
+  for (VectorWrapper v : incoming) {
--- End diff --

If this is original code, then clearly it does the right thing else we'd 
have seen problems by now. So, fine to leave as-is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-15 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r122324273
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch 
incoming) {
 }
   }
 
+  /**
+   *  Read and process (i.e., insert into the hash table and aggregate) 
records from the current batch.
+   *  Once complete, get the incoming NEXT batch and process it as well, 
etc.
+   *  For 1st phase, may return when an early output needs to be performed.
+   *
+   * @return Agg outcome status
+   */
   @Override
   public AggOutcome doWork() {
-try {
-  // Note: Keeping the outer and inner try blocks here to maintain 
some similarity with
-  // StreamingAggregate which does somethings conditionally in the 
outer try block.
-  // In the future HashAggregate may also need to perform some actions 
conditionally
-  // in the outer try block.
-
-  assert ! handlingSpills || currentIndex < Integer.MAX_VALUE;
 
-  outside:
-  while (true) {
+while (true) {
 
-// This would be called only once - after actual data arrives on 
incoming
-if ( schema == null && incoming.getRecordCount() > 0 ) {
-  this.schema = incoming.getSchema();
-  // Calculate the number of partitions based on actual incoming 
data
-  delayedSetup();
-}
+  // This would be called only once - first time actual data arrives 
on incoming
+  if ( schema == null && incoming.getRecordCount() > 0 ) {
+this.schema = incoming.getSchema();
+currentBatchRecordCount = incoming.getRecordCount(); // initialize 
for first non empty batch
+// Calculate the number of partitions based on actual incoming data
+delayedSetup();
+  }
 
-// loop through existing records, aggregating the values as 
necessary.
-if (EXTRA_DEBUG_1) {
-  logger.debug("Starting outer loop of doWork()...");
+  //
+  //  loop through existing records in this batch, aggregating the 
values as necessary.
+  //
+  if (EXTRA_DEBUG_1) {
+logger.debug("Starting outer loop of doWork()...");
+  }
+  for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+if (EXTRA_DEBUG_2) {
+  logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
 }
-for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-  if (EXTRA_DEBUG_2) {
-logger.debug("Doing loop with values underlying {}, current 
{}", underlyingIndex, currentIndex);
-  }
-  checkGroupAndAggrValues(currentIndex);
-  // If adding a group discovered a memory pressure during 1st 
phase, then start
-  // outputing some partition to free memory.
-  if ( earlyOutput ) {
-outputCurrentBatch();
-incIndex(); // next time continue with the next incoming row
-return AggOutcome.RETURN_OUTCOME;
-  }
+checkGroupAndAggrValues(currentIndex);
+// If adding a group discovered a memory pressure during 1st 
phase, then start
+// outputing some partition downstream in order to free memory.
+if ( earlyOutput ) {
+  outputCurrentBatch();
+  incIndex(); // next time continue with the next incoming row
+  return AggOutcome.RETURN_OUTCOME;
 }
+  }
+
+  if (EXTRA_DEBUG_1) {
+logger.debug("Processed {} records", underlyingIndex);
+  }
 
-if (EXTRA_DEBUG_1) {
-  logger.debug("Processed {} records", underlyingIndex);
+  // Cleanup the previous batch since we are done processing it.
+  for (VectorWrapper v : incoming) {
+v.getValueVector().clear();
+  }
+  //
+  // Get the NEXT input batch, initially from the upstream, later (if 
there was a spill)
+  // from one of the spill files (The spill case is handled 
differently here to avoid
+  // collecting stats on the spilled records)
+  //
+  if ( handlingSpills ) {
+outcome = context.shouldContinue() ? incoming.next() : 
IterOutcome.STOP;
+  } else {
+long beforeAlloc = allocator.getAllocatedMemory();
+
+// Get the next RecordBatch from the incoming (i.e. upstream 
operator)
+outcome = outgoing.next(0, incoming);
+
+// If incoming batch is bigger than our estimate - adjust the 
estimate to match
+long afterAlloc = 

[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-15 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r122323606
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch 
incoming) {
 }
   }
 
+  /**
+   *  Read and process (i.e., insert into the hash table and aggregate) 
records from the current batch.
+   *  Once complete, get the incoming NEXT batch and process it as well, 
etc.
+   *  For 1st phase, may return when an early output needs to be performed.
+   *
+   * @return Agg outcome status
+   */
   @Override
   public AggOutcome doWork() {
-try {
-  // Note: Keeping the outer and inner try blocks here to maintain 
some similarity with
-  // StreamingAggregate which does somethings conditionally in the 
outer try block.
-  // In the future HashAggregate may also need to perform some actions 
conditionally
-  // in the outer try block.
-
-  assert ! handlingSpills || currentIndex < Integer.MAX_VALUE;
 
-  outside:
-  while (true) {
+while (true) {
 
-// This would be called only once - after actual data arrives on 
incoming
-if ( schema == null && incoming.getRecordCount() > 0 ) {
-  this.schema = incoming.getSchema();
-  // Calculate the number of partitions based on actual incoming 
data
-  delayedSetup();
-}
+  // This would be called only once - first time actual data arrives 
on incoming
+  if ( schema == null && incoming.getRecordCount() > 0 ) {
+this.schema = incoming.getSchema();
+currentBatchRecordCount = incoming.getRecordCount(); // initialize 
for first non empty batch
+// Calculate the number of partitions based on actual incoming data
+delayedSetup();
+  }
 
-// loop through existing records, aggregating the values as 
necessary.
-if (EXTRA_DEBUG_1) {
-  logger.debug("Starting outer loop of doWork()...");
+  //
+  //  loop through existing records in this batch, aggregating the 
values as necessary.
+  //
+  if (EXTRA_DEBUG_1) {
+logger.debug("Starting outer loop of doWork()...");
+  }
+  for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+if (EXTRA_DEBUG_2) {
+  logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
 }
-for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-  if (EXTRA_DEBUG_2) {
-logger.debug("Doing loop with values underlying {}, current 
{}", underlyingIndex, currentIndex);
-  }
-  checkGroupAndAggrValues(currentIndex);
-  // If adding a group discovered a memory pressure during 1st 
phase, then start
-  // outputing some partition to free memory.
-  if ( earlyOutput ) {
-outputCurrentBatch();
-incIndex(); // next time continue with the next incoming row
-return AggOutcome.RETURN_OUTCOME;
-  }
+checkGroupAndAggrValues(currentIndex);
+// If adding a group discovered a memory pressure during 1st 
phase, then start
+// outputing some partition downstream in order to free memory.
+if ( earlyOutput ) {
+  outputCurrentBatch();
+  incIndex(); // next time continue with the next incoming row
+  return AggOutcome.RETURN_OUTCOME;
 }
+  }
+
+  if (EXTRA_DEBUG_1) {
+logger.debug("Processed {} records", underlyingIndex);
+  }
 
-if (EXTRA_DEBUG_1) {
-  logger.debug("Processed {} records", underlyingIndex);
+  // Cleanup the previous batch since we are done processing it.
+  for (VectorWrapper v : incoming) {
+v.getValueVector().clear();
+  }
+  //
+  // Get the NEXT input batch, initially from the upstream, later (if 
there was a spill)
+  // from one of the spill files (The spill case is handled 
differently here to avoid
+  // collecting stats on the spilled records)
+  //
+  if ( handlingSpills ) {
+outcome = context.shouldContinue() ? incoming.next() : 
IterOutcome.STOP;
+  } else {
+long beforeAlloc = allocator.getAllocatedMemory();
+
+// Get the next RecordBatch from the incoming (i.e. upstream 
operator)
+outcome = outgoing.next(0, incoming);
+
+// If incoming batch is bigger than our estimate - adjust the 
estimate to match
+long afterAlloc = 

[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-15 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r122322345
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch 
incoming) {
 }
   }
 
+  /**
+   *  Read and process (i.e., insert into the hash table and aggregate) 
records from the current batch.
+   *  Once complete, get the incoming NEXT batch and process it as well, 
etc.
+   *  For 1st phase, may return when an early output needs to be performed.
+   *
+   * @return Agg outcome status
+   */
   @Override
   public AggOutcome doWork() {
-try {
-  // Note: Keeping the outer and inner try blocks here to maintain 
some similarity with
-  // StreamingAggregate which does somethings conditionally in the 
outer try block.
-  // In the future HashAggregate may also need to perform some actions 
conditionally
-  // in the outer try block.
-
-  assert ! handlingSpills || currentIndex < Integer.MAX_VALUE;
 
-  outside:
-  while (true) {
+while (true) {
 
-// This would be called only once - after actual data arrives on 
incoming
-if ( schema == null && incoming.getRecordCount() > 0 ) {
-  this.schema = incoming.getSchema();
-  // Calculate the number of partitions based on actual incoming 
data
-  delayedSetup();
-}
+  // This would be called only once - first time actual data arrives 
on incoming
+  if ( schema == null && incoming.getRecordCount() > 0 ) {
+this.schema = incoming.getSchema();
+currentBatchRecordCount = incoming.getRecordCount(); // initialize 
for first non empty batch
+// Calculate the number of partitions based on actual incoming data
+delayedSetup();
+  }
 
-// loop through existing records, aggregating the values as 
necessary.
-if (EXTRA_DEBUG_1) {
-  logger.debug("Starting outer loop of doWork()...");
+  //
+  //  loop through existing records in this batch, aggregating the 
values as necessary.
+  //
+  if (EXTRA_DEBUG_1) {
+logger.debug("Starting outer loop of doWork()...");
+  }
+  for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+if (EXTRA_DEBUG_2) {
+  logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
 }
-for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-  if (EXTRA_DEBUG_2) {
-logger.debug("Doing loop with values underlying {}, current 
{}", underlyingIndex, currentIndex);
-  }
-  checkGroupAndAggrValues(currentIndex);
-  // If adding a group discovered a memory pressure during 1st 
phase, then start
-  // outputing some partition to free memory.
-  if ( earlyOutput ) {
-outputCurrentBatch();
-incIndex(); // next time continue with the next incoming row
-return AggOutcome.RETURN_OUTCOME;
-  }
+checkGroupAndAggrValues(currentIndex);
+// If adding a group discovered a memory pressure during 1st 
phase, then start
+// outputing some partition downstream in order to free memory.
+if ( earlyOutput ) {
+  outputCurrentBatch();
+  incIndex(); // next time continue with the next incoming row
+  return AggOutcome.RETURN_OUTCOME;
 }
+  }
+
+  if (EXTRA_DEBUG_1) {
+logger.debug("Processed {} records", underlyingIndex);
+  }
 
-if (EXTRA_DEBUG_1) {
-  logger.debug("Processed {} records", underlyingIndex);
+  // Cleanup the previous batch since we are done processing it.
+  for (VectorWrapper v : incoming) {
--- End diff --

Mmmm... this is the original Hash Agg code. Hence probably the two 
invariants are met.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-15 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r122321138
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -154,7 +152,7 @@
   private int cycleNum = 0; // primary, secondary, tertiary, etc.
   private int originalPartition = -1; // the partition a secondary reads 
from
 
-  private class SpilledPartition { public int spilledBatches; public 
String spillFile /* Path filePath */; int cycleNum; int origPartn; int 
prevOrigPartn; }
+  private class SpilledPartition { public int spilledBatches; public 
String spillFile; int cycleNum; int origPartn; int prevOrigPartn; }
--- End diff --

Done, thanks.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-15 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r122320539
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -130,6 +127,7 @@
   private int currentIndex = 0;
   private IterOutcome outcome;
   private int numGroupedRecords = 0;
+  private int currentBatchRecordCount = 0; // Performance: Avoid repeated 
calls to getRecordCount()
--- End diff --

The getRecordCount() virtual method is called **per each record** ! And in 
some cases this method performs several checks. Unfortunately other 
inefficiencies indeed dwarf this savings. A local variable won't work, as 
execution may return and come back (e.g. spill) midway processing the incoming 
batch.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-15 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r122313161
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch 
incoming) {
 }
   }
 
+  /**
+   *  Read and process (i.e., insert into the hash table and aggregate) 
records from the current batch.
+   *  Once complete, get the incoming NEXT batch and process it as well, 
etc.
+   *  For 1st phase, may return when an early output needs to be performed.
+   *
+   * @return Agg outcome status
+   */
   @Override
   public AggOutcome doWork() {
-try {
-  // Note: Keeping the outer and inner try blocks here to maintain 
some similarity with
-  // StreamingAggregate which does somethings conditionally in the 
outer try block.
-  // In the future HashAggregate may also need to perform some actions 
conditionally
-  // in the outer try block.
-
-  assert ! handlingSpills || currentIndex < Integer.MAX_VALUE;
 
-  outside:
-  while (true) {
+while (true) {
 
-// This would be called only once - after actual data arrives on 
incoming
-if ( schema == null && incoming.getRecordCount() > 0 ) {
-  this.schema = incoming.getSchema();
-  // Calculate the number of partitions based on actual incoming 
data
-  delayedSetup();
-}
+  // This would be called only once - first time actual data arrives 
on incoming
+  if ( schema == null && incoming.getRecordCount() > 0 ) {
+this.schema = incoming.getSchema();
+currentBatchRecordCount = incoming.getRecordCount(); // initialize 
for first non empty batch
+// Calculate the number of partitions based on actual incoming data
+delayedSetup();
+  }
 
-// loop through existing records, aggregating the values as 
necessary.
-if (EXTRA_DEBUG_1) {
-  logger.debug("Starting outer loop of doWork()...");
+  //
+  //  loop through existing records in this batch, aggregating the 
values as necessary.
+  //
+  if (EXTRA_DEBUG_1) {
+logger.debug("Starting outer loop of doWork()...");
+  }
+  for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+if (EXTRA_DEBUG_2) {
+  logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
 }
-for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-  if (EXTRA_DEBUG_2) {
-logger.debug("Doing loop with values underlying {}, current 
{}", underlyingIndex, currentIndex);
-  }
-  checkGroupAndAggrValues(currentIndex);
-  // If adding a group discovered a memory pressure during 1st 
phase, then start
-  // outputing some partition to free memory.
-  if ( earlyOutput ) {
-outputCurrentBatch();
-incIndex(); // next time continue with the next incoming row
-return AggOutcome.RETURN_OUTCOME;
-  }
+checkGroupAndAggrValues(currentIndex);
+// If adding a group discovered a memory pressure during 1st 
phase, then start
+// outputing some partition downstream in order to free memory.
+if ( earlyOutput ) {
+  outputCurrentBatch();
+  incIndex(); // next time continue with the next incoming row
+  return AggOutcome.RETURN_OUTCOME;
 }
+  }
+
+  if (EXTRA_DEBUG_1) {
+logger.debug("Processed {} records", underlyingIndex);
+  }
 
-if (EXTRA_DEBUG_1) {
-  logger.debug("Processed {} records", underlyingIndex);
+  // Cleanup the previous batch since we are done processing it.
+  for (VectorWrapper v : incoming) {
+v.getValueVector().clear();
+  }
+  //
+  // Get the NEXT input batch, initially from the upstream, later (if 
there was a spill)
+  // from one of the spill files (The spill case is handled 
differently here to avoid
+  // collecting stats on the spilled records)
+  //
+  if ( handlingSpills ) {
+outcome = context.shouldContinue() ? incoming.next() : 
IterOutcome.STOP;
+  } else {
+long beforeAlloc = allocator.getAllocatedMemory();
+
+// Get the next RecordBatch from the incoming (i.e. upstream 
operator)
+outcome = outgoing.next(0, incoming);
+
+// If incoming batch is bigger than our estimate - adjust the 
estimate to match
+long afterAlloc = 

[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-15 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r122312907
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch 
incoming) {
 }
   }
 
+  /**
+   *  Read and process (i.e., insert into the hash table and aggregate) 
records from the current batch.
+   *  Once complete, get the incoming NEXT batch and process it as well, 
etc.
+   *  For 1st phase, may return when an early output needs to be performed.
+   *
+   * @return Agg outcome status
+   */
   @Override
   public AggOutcome doWork() {
-try {
-  // Note: Keeping the outer and inner try blocks here to maintain 
some similarity with
-  // StreamingAggregate which does somethings conditionally in the 
outer try block.
-  // In the future HashAggregate may also need to perform some actions 
conditionally
-  // in the outer try block.
-
-  assert ! handlingSpills || currentIndex < Integer.MAX_VALUE;
 
-  outside:
-  while (true) {
+while (true) {
 
-// This would be called only once - after actual data arrives on 
incoming
-if ( schema == null && incoming.getRecordCount() > 0 ) {
-  this.schema = incoming.getSchema();
-  // Calculate the number of partitions based on actual incoming 
data
-  delayedSetup();
-}
+  // This would be called only once - first time actual data arrives 
on incoming
+  if ( schema == null && incoming.getRecordCount() > 0 ) {
+this.schema = incoming.getSchema();
+currentBatchRecordCount = incoming.getRecordCount(); // initialize 
for first non empty batch
+// Calculate the number of partitions based on actual incoming data
+delayedSetup();
+  }
 
-// loop through existing records, aggregating the values as 
necessary.
-if (EXTRA_DEBUG_1) {
-  logger.debug("Starting outer loop of doWork()...");
+  //
+  //  loop through existing records in this batch, aggregating the 
values as necessary.
+  //
+  if (EXTRA_DEBUG_1) {
+logger.debug("Starting outer loop of doWork()...");
+  }
+  for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+if (EXTRA_DEBUG_2) {
+  logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
 }
-for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-  if (EXTRA_DEBUG_2) {
-logger.debug("Doing loop with values underlying {}, current 
{}", underlyingIndex, currentIndex);
-  }
-  checkGroupAndAggrValues(currentIndex);
-  // If adding a group discovered a memory pressure during 1st 
phase, then start
-  // outputing some partition to free memory.
-  if ( earlyOutput ) {
-outputCurrentBatch();
-incIndex(); // next time continue with the next incoming row
-return AggOutcome.RETURN_OUTCOME;
-  }
+checkGroupAndAggrValues(currentIndex);
+// If adding a group discovered a memory pressure during 1st 
phase, then start
+// outputing some partition downstream in order to free memory.
+if ( earlyOutput ) {
+  outputCurrentBatch();
+  incIndex(); // next time continue with the next incoming row
+  return AggOutcome.RETURN_OUTCOME;
 }
+  }
+
+  if (EXTRA_DEBUG_1) {
+logger.debug("Processed {} records", underlyingIndex);
+  }
 
-if (EXTRA_DEBUG_1) {
-  logger.debug("Processed {} records", underlyingIndex);
+  // Cleanup the previous batch since we are done processing it.
+  for (VectorWrapper v : incoming) {
+v.getValueVector().clear();
+  }
+  //
+  // Get the NEXT input batch, initially from the upstream, later (if 
there was a spill)
+  // from one of the spill files (The spill case is handled 
differently here to avoid
+  // collecting stats on the spilled records)
+  //
+  if ( handlingSpills ) {
+outcome = context.shouldContinue() ? incoming.next() : 
IterOutcome.STOP;
+  } else {
+long beforeAlloc = allocator.getAllocatedMemory();
+
+// Get the next RecordBatch from the incoming (i.e. upstream 
operator)
+outcome = outgoing.next(0, incoming);
+
+// If incoming batch is bigger than our estimate - adjust the 
estimate to match
+long afterAlloc = 

[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-15 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r122310373
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -154,7 +152,7 @@
   private int cycleNum = 0; // primary, secondary, tertiary, etc.
   private int originalPartition = -1; // the partition a secondary reads 
from
 
-  private class SpilledPartition { public int spilledBatches; public 
String spillFile /* Path filePath */; int cycleNum; int origPartn; int 
prevOrigPartn; }
+  private class SpilledPartition { public int spilledBatches; public 
String spillFile; int cycleNum; int origPartn; int prevOrigPartn; }
--- End diff --

`private static class` since you don't have any methods and so have no use 
for the "inner this" pointer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-15 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r122312536
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch 
incoming) {
 }
   }
 
+  /**
+   *  Read and process (i.e., insert into the hash table and aggregate) 
records from the current batch.
+   *  Once complete, get the incoming NEXT batch and process it as well, 
etc.
+   *  For 1st phase, may return when an early output needs to be performed.
+   *
+   * @return Agg outcome status
+   */
   @Override
   public AggOutcome doWork() {
-try {
-  // Note: Keeping the outer and inner try blocks here to maintain 
some similarity with
-  // StreamingAggregate which does somethings conditionally in the 
outer try block.
-  // In the future HashAggregate may also need to perform some actions 
conditionally
-  // in the outer try block.
-
-  assert ! handlingSpills || currentIndex < Integer.MAX_VALUE;
 
-  outside:
-  while (true) {
+while (true) {
 
-// This would be called only once - after actual data arrives on 
incoming
-if ( schema == null && incoming.getRecordCount() > 0 ) {
-  this.schema = incoming.getSchema();
-  // Calculate the number of partitions based on actual incoming 
data
-  delayedSetup();
-}
+  // This would be called only once - first time actual data arrives 
on incoming
+  if ( schema == null && incoming.getRecordCount() > 0 ) {
+this.schema = incoming.getSchema();
+currentBatchRecordCount = incoming.getRecordCount(); // initialize 
for first non empty batch
+// Calculate the number of partitions based on actual incoming data
+delayedSetup();
+  }
 
-// loop through existing records, aggregating the values as 
necessary.
-if (EXTRA_DEBUG_1) {
-  logger.debug("Starting outer loop of doWork()...");
+  //
+  //  loop through existing records in this batch, aggregating the 
values as necessary.
+  //
+  if (EXTRA_DEBUG_1) {
+logger.debug("Starting outer loop of doWork()...");
+  }
+  for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+if (EXTRA_DEBUG_2) {
+  logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
 }
-for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-  if (EXTRA_DEBUG_2) {
-logger.debug("Doing loop with values underlying {}, current 
{}", underlyingIndex, currentIndex);
-  }
-  checkGroupAndAggrValues(currentIndex);
-  // If adding a group discovered a memory pressure during 1st 
phase, then start
-  // outputing some partition to free memory.
-  if ( earlyOutput ) {
-outputCurrentBatch();
-incIndex(); // next time continue with the next incoming row
-return AggOutcome.RETURN_OUTCOME;
-  }
+checkGroupAndAggrValues(currentIndex);
+// If adding a group discovered a memory pressure during 1st 
phase, then start
+// outputing some partition downstream in order to free memory.
+if ( earlyOutput ) {
+  outputCurrentBatch();
+  incIndex(); // next time continue with the next incoming row
+  return AggOutcome.RETURN_OUTCOME;
 }
+  }
+
+  if (EXTRA_DEBUG_1) {
+logger.debug("Processed {} records", underlyingIndex);
+  }
 
-if (EXTRA_DEBUG_1) {
-  logger.debug("Processed {} records", underlyingIndex);
+  // Cleanup the previous batch since we are done processing it.
+  for (VectorWrapper v : incoming) {
--- End diff --

Two comments/questions here.

First, can we be sure that there is always a Selection Vector Remover 
between the hash agg and anything that can emit a batch that uses an SV4 (such 
as sort)? Otherwise, the `getValueVector()` method won't work.

Second, is it certain that no other code references the vectors from this 
container?

If both those invariants are met, then, yes, it is the job of this operator 
to release buffers created by the upstream.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with 

[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-15 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r122310219
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -130,6 +127,7 @@
   private int currentIndex = 0;
   private IterOutcome outcome;
   private int numGroupedRecords = 0;
+  private int currentBatchRecordCount = 0; // Performance: Avoid repeated 
calls to getRecordCount()
--- End diff --

Not sure that the very small savings in time is worth the complexity of 
keeping a cached copy in sync. If needed for an inner loop, can it be a local 
variable instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #837: DRILL-5514: Enhance VectorContainer to merge two row sets

2017-06-15 Thread paul-rogers
Github user paul-rogers commented on the issue:

https://github.com/apache/drill/pull/837
  
Thanks for the review!

Squashed commits and rebased on master to prepare for commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #837: DRILL-5514: Enhance VectorContainer to merge two row sets

2017-06-15 Thread paul-rogers
Github user paul-rogers commented on the issue:

https://github.com/apache/drill/pull/837
  
Fixed the two comments per CR suggestions. Please review. If acceptable, 
I'll squash the commits in preparation for commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #854: DRILL-5589: JDBC client crashes after successful au...

2017-06-15 Thread sohami
GitHub user sohami opened a pull request:

https://github.com/apache/drill/pull/854

DRILL-5589: JDBC client crashes after successful authentication if tr…

…ace logging is enabled

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sohami/drill DRILL-5589

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/854.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #854


commit 0e6de7b798a2a1bf36b5ac0790ce47a41c905470
Author: Sorabh Hamirwasia 
Date:   2017-06-15T18:00:21Z

DRILL-5589: JDBC client crashes after successful authentication if trace 
logging is enabled




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (DRILL-5590) Drill return IndexOutOfBoundsException when a (Text) file > 4096 rows

2017-06-15 Thread Victor Garcia (JIRA)
Victor Garcia created DRILL-5590:


 Summary: Drill return IndexOutOfBoundsException when a (Text) file 
> 4096 rows
 Key: DRILL-5590
 URL: https://issues.apache.org/jira/browse/DRILL-5590
 Project: Apache Drill
  Issue Type: Bug
  Components: Functions - Drill
Affects Versions: 1.10.0
 Environment: OS: Oracle Linux Enterprise 7, OSX 10.10.1
JVM: 1.8
Drill Installation type: Embebed or distributed(Cluster 2 Nodes)
Reporter: Victor Garcia


I describe below, the storage (name lco):
{
  "type": "file",
  "enabled": true,
  "connection": "file:///",
  "config": null,
  "workspaces": {
"root": {
  "location": "/data/source/lco",
  "writable": false,
  "defaultInputFormat": "psv"
}
  },
  "formats": {
"psv": {
  "type": "text",
  "extensions": [
"txt"
  ],
  "extractHeader": true,
  "delimiter": "|"
}
  }
}

Querying a CSV file with 3 columns and when the file have > 4096 (including the 
header), Drill return a error, but when i reduce the rows to 4095 rows the 
query work.

The original file have 35M of rows, but i test reducing the rows until that 
find the number of rows that produce the error.

The original source file is in this URL 
(http://cfdisat.blob.core.windows.net/lco/l_RFC_2017_05_11_2.txt.gz)


First part of error:

at 
org.apache.drill.exec.rpc.user.QueryResultHandler.resultArrived(QueryResultHandler.java:123)
 [drill-java-exec-1.10.0.jar:1.10.0]
at 
org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:343) 
[drill-java-exec-1.10.0.jar:1.10.0]
at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:88) 
[drill-java-exec-1.10.0.jar:1.10.0]
at 
org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:274) 
[drill-rpc-1.10.0.jar:1.10.0]
at 
org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:244) 
[drill-rpc-1.10.0.jar:1.10.0]
at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
 [netty-codec-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
 [netty-handler-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 [netty-codec-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
 [netty-codec-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
[netty-transport-4.0.27.Final.jar:4.0.27.Final]
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
 

[GitHub] drill pull request #837: DRILL-5514: Enhance VectorContainer to merge two ro...

2017-06-15 Thread bitblender
Github user bitblender commented on a diff in the pull request:

https://github.com/apache/drill/pull/837#discussion_r122287615
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java ---
@@ -162,20 +162,22 @@ private boolean majorTypeEqual(MajorType t1, 
MajorType t2) {
* Merge two schema to produce a new, merged schema. The caller is 
responsible
* for ensuring that column names are unique. The order of the fields in 
the
* new schema is the same as that of this schema, with the other 
schema's fields
-   * appended in the order defined in the other schema. The resulting 
selection
-   * vector mode is the same as this schema. (That is, this schema is 
assumed to
-   * be the main part of the batch, possibly with a selection vector, with 
the
-   * other schema representing additional, new columns.)
+   * appended in the order defined in the other schema.
+   * 
+   * Merging data with selection vectors is unlikely to be useful, or work 
well.
--- End diff --

Can you please leave a comment about why this is unlikely to be useful, or 
work well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #837: DRILL-5514: Enhance VectorContainer to merge two ro...

2017-06-15 Thread bitblender
Github user bitblender commented on a diff in the pull request:

https://github.com/apache/drill/pull/837#discussion_r122287096
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
 ---
@@ -110,13 +110,16 @@ public void testContainerMerge() {
 RowSet mergedRs = left.merge(right);
 comparison.verifyAndClear(mergedRs);
 
-// Add a selection vector. Ensure the SV appears in the merged
-// result. Test as a row set since container's don't actually
-// carry the selection vector.
+// Add a selection vector. Merging is forbidden.
--- End diff --

Maybe this can be changed to "//Merging data with a selection vector is 
forbidden". As is the comment implies that we are adding a selection vector.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #846: DRILL-5544: Out of heap running CTAS against text delimite...

2017-06-15 Thread vdiravka
Github user vdiravka commented on the issue:

https://github.com/apache/drill/pull/846
  
Commits are squashed into one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #852: DRILL-5587: Validate Parquet blockSize and pageSize config...

2017-06-15 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/852
  
@arina-ielchiieva Done. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Thinking about Drill 2.0

2017-06-15 Thread Paul Rogers
Hi Uwe,

This is incredibly helpful information! You explanation makes perfect sense.

We work quite a bit with ODBC and JDBC: two interfaces that are very much 
synchronous and row-based. There are three challenges key with working with 
Drill:

* Drill results are columnar, requiring a column-to-row translation for xDBC
* Drill uses an asynchronous API, while JDBC and ODBC are synchronous, 
resulting in an async-to-sync API translation.
* The JDBC API is based on the Drill client which requires quite a bit (almost 
all, really) of Drill code.

The thought is to create a new API that serves the need of ODBC and JDBC, but 
without the complexity (while, of course, preserving the existing client for 
other uses.) Said another way, find a way to keep the xDBC interfaces simple so 
that they don’t take quite so much space in the client, and don’t require quite 
so much work to maintain.

The first issue (row vs. columnar) turns out to not be a huge issue, the 
columnar-to-row translation code exists and works. The real issue is allowing 
the client to the size of the data sent from the server. (At present, the 
server decides the “batch” size, and sometimes the size is huge.) So, we can 
just focus on controlling batch size (and thus client buffer allocations), but 
retain the columnar form, even for ODBC and JDBC.

So, for the Pandas use case, does your code allow (or benefit from) multiple 
simultaneous queries over the same connection? Or, since Python seems to be 
only approximately multi-threaded, would a synchronous, columnar API work 
better? Here I just mean, in a single connection, is there a need to run 
multiple concurrent queries, or is the classic 
one-concurrent-query-per-connection model easier for Python to consume?

Another point you raise is that our client-side column format should be Arrow, 
or Arrow-compatible. (That is, either using Arrow code, or the same data format 
as Arrow.) That way users of your work can easily leverage Drill.

This last question raises an interesting issue that I (at least) need to 
understand more clearly. Is Arrow a data format + code? Or, is the data format 
one aspect of Arrow, and the implementation another? Would be great to have a 
common data format, but as we squeeze ever more performance from Drill, we find 
we have to very carefully tune our data manipulation code for the specific 
needs of Drill queries. I wonder how we’d do that if we switched to using 
Arrow’s generic vector implementation code? Has anyone else wrestled with this 
question for your project?

Thanks,

- Paul


> On Jun 15, 2017, at 12:23 AM, Uwe L. Korn  wrote:
> 
> Hello Paul,
> 
> Bringing in a bit of the perspective partly of an Arrow developer but mostly 
> someone that works quite a lot in Python with the respective data libraries 
> there: In Python all (performant) data chrunching work is done on columar 
> representations. While this is partly due to columnar being a more CPU 
> efficient on these tasks, this is also because columnar can be abstracted in 
> a form that you implement all computational work with C/C++ or an LLVM-based 
> JIT while still keeping clear and understandable interfaces in Python. In the 
> end to make an efficient Python support, we will always have to convert into 
> a columnar representation, making row-wise APIs to a system that is 
> internally columnar quite annoying as we have a lot of wastage in the 
> conversion layer. In the case that one would want to provide the ability to 
> support Python UDFs, this would lead to the situation that in most cases the 
> UDF calls will be greatly dominated by the conversion logic.
> 
> For the actual performance differences that this makes, you can have a look 
> at the work that recently is happening in Apache Spark where Arrow is used 
> for the conversion of the result from Spark's internal JVM data structures 
> into typical Python ones ("Pandas DataFrames"). In comparision to the 
> existing conversion, this sees currently a speedup of 40x but will be even 
> higher once further steps are implemented. Julien should be able to provide a 
> link to slides that outline the work better.
> 
> As I'm quite new to Drill, I cannot go into much further details w.r.t. Drill 
> but be aware that for languages like Python, having a columnar API really 
> matters. While Drill integrates with Python at the moment not really as a 
> first class citizen, moving to row-wise APIs won't probably make a difference 
> to the current situation but good columnar APIs would help us to keep the 
> path open for the future.
> 
> Uwe



[GitHub] drill pull request #853: DRILL-5130: Fix explainTerms method for Values node...

2017-06-15 Thread arina-ielchiieva
GitHub user arina-ielchiieva opened a pull request:

https://github.com/apache/drill/pull/853

DRILL-5130: Fix explainTerms method for Values nodes to describe two 
different Values nodes correctly

1. Factor out common logic for DrillValuesRel and ValuesPrel into 
DrillValuesRelBase.
2. Revisit explainTerms to write full tuples content so consequently Values 
nodes with the same row type and count but different values are considered to 
be different.
3. Minor refactoring.

Details in Jira DRILL-5130.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arina-ielchiieva/drill DRILL-5130

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/853.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #853


commit 598a44f81cec5fdfc005573c39d790ff1b20eb6b
Author: Arina Ielchiieva 
Date:   2017-06-15T15:03:34Z

DRILL-5130: Fix explainTerms method for Values nodes to describe two 
different Values nodes correctly

1. Factor out common logic for DrillValuesRel and ValuesPrel into 
DrillValuesRelBase.
2. Revisit explainTerms to write full tuples content so consequently Values 
nodes with the same row type and count but different values are considered to 
be different.
3. Minor refactoring.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #844: DRILL-5538: Create TopProject with validatedNodeType after...

2017-06-15 Thread arina-ielchiieva
Github user arina-ielchiieva commented on the issue:

https://github.com/apache/drill/pull/844
  
@jinfengni updated to PR to create TopProject with validatedNodeType after 
PHYSICAL phase instead of removing ProjectRemoveRule completely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #852: DRILL-5587: Validate Parquet blockSize and pageSize config...

2017-06-15 Thread arina-ielchiieva
Github user arina-ielchiieva commented on the issue:

https://github.com/apache/drill/pull/852
  
@ppadma, could you please resolve conflicts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Thinking about Drill 2.0

2017-06-15 Thread Uwe L. Korn
Hello Paul,

Bringing in a bit of the perspective partly of an Arrow developer but mostly 
someone that works quite a lot in Python with the respective data libraries 
there: In Python all (performant) data chrunching work is done on columar 
representations. While this is partly due to columnar being a more CPU 
efficient on these tasks, this is also because columnar can be abstracted in a 
form that you implement all computational work with C/C++ or an LLVM-based JIT 
while still keeping clear and understandable interfaces in Python. In the end 
to make an efficient Python support, we will always have to convert into a 
columnar representation, making row-wise APIs to a system that is internally 
columnar quite annoying as we have a lot of wastage in the conversion layer. In 
the case that one would want to provide the ability to support Python UDFs, 
this would lead to the situation that in most cases the UDF calls will be 
greatly dominated by the conversion logic.

For the actual performance differences that this makes, you can have a look at 
the work that recently is happening in Apache Spark where Arrow is used for the 
conversion of the result from Spark's internal JVM data structures into typical 
Python ones ("Pandas DataFrames"). In comparision to the existing conversion, 
this sees currently a speedup of 40x but will be even higher once further steps 
are implemented. Julien should be able to provide a link to slides that outline 
the work better.

As I'm quite new to Drill, I cannot go into much further details w.r.t. Drill 
but be aware that for languages like Python, having a columnar API really 
matters. While Drill integrates with Python at the moment not really as a first 
class citizen, moving to row-wise APIs won't probably make a difference to the 
current situation but good columnar APIs would help us to keep the path open 
for the future.

Uwe

> Am 13.06.2017 um 06:11 schrieb Paul Rogers :
> 
> Thanks for the suggestions!
> 
> The issue is only partly Calcite changes. The real challenge for potential 
> contributors is that the Drill storage plugin exposes Calcite mechanisms 
> directly. That is, to write storage plugin, one must know (or, more likely, 
> experiment to learn) the odd set of calls made to the storage plugin, for a 
> group scan, then a sub scan, then this or that. Then, learning those calls, 
> map what you want to do to those calls. In some cases, as Calcite chugs 
> along, it calls the same methods multiple times, so the plugin writer has to 
> be prepared to implement caching to avoid banging on the underlying system 
> multiple times for the same data.
> 
> The key opportunity here is to observe that the current API is at the 
> implementation level: as callbacks from Calcite. (Though, the Drill “easy” 
> storage plugin does hide some of the details.) Instead, we’d like an API at 
> the definition level: that the plugin simply declares that, say, it can 
> return a schema, or can handle certain kinds of filter push-down, etc.
> 
> If we can define that API at the metadata (planning) level, then we can 
> create an adapter between that API and Calcite. Doing so makes it much easier 
> to test the plugin, and isolates the plugin from future code changes as 
> Calcite evolves and improves: the adapter changes but not the plugin metadata 
> API.
> 
> As you suggest, the resulting definition API would be handy to share between 
> projects.
> 
> On the execution side, however, Drill plugins are very specific to Drill’s 
> operator framework, Drill’s schema-on-read mechanism, Drill’s special columns 
> (file metadata, partitions), Drill’s vector “mutators” and so on. Here, any 
> synergy would be with Arrow to define a common “mutator” API so that a “row 
> batch reader” written for one system should work with the other.
> 
> In any case, this kind of sharing is hard to define up front, we might 
> instead keep the discussion going to see what works for Drill, what we can 
> abstract out, and how we can make the common abstraction work for other 
> systems beyond Drill.
> 
> Thanks,
> 
> - Paul
> 
>> On Jun 9, 2017, at 3:38 PM, Julian Hyde  wrote:
>> 
>> 
>>> On Jun 5, 2017, at 11:59 AM, Paul Rogers  wrote:
>>> 
>>> Similarly, the storage plugin API exposes details of Calcite (which seems 
>>> to evolve with each new version), exposes value vector implementations, and 
>>> so on. A cleaner, simpler, more isolated API will allow storage plugins to 
>>> be built faster, but will also isolate them from Drill internals changes. 
>>> Without isolation, each change to Drill internals would require plugin 
>>> authors to update their plugin before Drill can be released.
>> 
>> Sorry you’re getting burned by Calcite changes. We try to minimize impact, 
>> but sometimes it’s difficult to see what you’re breaking.
>> 
>> I like the goal of a stable storage plugin API. Maybe it’s something Drill 
>> and Calcite can