[GitHub] drill pull request #1225: DRILL-6272: Refactor dynamic UDFs and function ini...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1225#discussion_r185497944 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java --- @@ -125,7 +125,7 @@ public void testHasPathThrowsDrillRuntimeException() { Mockito .when(client.getCache().getCurrentData(absPath)) -.thenThrow(Exception.class); +.thenThrow(RuntimeException.class); --- End diff -- IMO, this method needs to be changed to test `ZookeeperClient.hasPath(String path, boolean consistent)`. It is OK to do it in this PR or in a separate commit/JIRA/PR. If you decide to do it in a separate commit, please file JIRA. ---
[GitHub] drill pull request #1225: DRILL-6272: Refactor dynamic UDFs and function ini...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1225#discussion_r185383432 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/JarBuilder.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.udf.dynamic; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.ConsoleAppender; +import org.apache.maven.cli.MavenCli; +import org.apache.maven.cli.logging.Slf4jLogger; +import org.codehaus.plexus.DefaultPlexusContainer; +import org.codehaus.plexus.PlexusContainer; +import org.codehaus.plexus.logging.BaseLoggerManager; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; + +public class JarBuilder { + + private final MavenCli cli; + + public JarBuilder() { +this.cli = new MavenCli() { + @Override + protected void customizeContainer(PlexusContainer container) { +((DefaultPlexusContainer) container).setLoggerManager(new BaseLoggerManager() { + @Override + protected org.codehaus.plexus.logging.Logger createLogger(String s) { +return new Slf4jLogger(setupLogger(JarBuilder.class.getName(), Level.INFO)); + } +}); + } +}; + } + + /** + * Builds jars using embedded maven. Includes files / resources based given pattern, + * otherwise using defaults provided in pom.xml. + * + * @param jarName jar name + * @param projectDir project dir + * @param includeFiles pattern indicating which files should be included + * @param includeResources pattern indicating which resources should be included + * + * @return build exit code, 0 if build was successful + */ + public int build(String jarName, String projectDir, String includeFiles, String includeResources) { +System.setProperty("maven.multiModuleProjectDirectory", projectDir); +List params = new LinkedList<>(); +params.add("clean"); +params.add("package"); +params.add("-DskipTests"); +params.add("-Djar.finalName=" + jarName); +if (includeFiles != null) { + params.add("-Dinclude.files=" + includeFiles); +} +if (includeResources != null) { + params.add("-Dinclude.resources=" + includeResources); +} +return cli.doMain(params.toArray(new String[params.size()]), projectDir, System.out, System.err); + } + + private static Logger setupLogger(String string, Level logLevel) { --- End diff -- Is this necessary? ---
[GitHub] drill pull request #1225: DRILL-6272: Refactor dynamic UDFs and function ini...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1225#discussion_r185385028 --- Diff: exec/java-exec/src/test/resources/drill-udf/pom.xml --- @@ -0,0 +1,90 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + 4.0.0 + + org.apache.drill.udf + drill-udf + 1.0 + + +${project.name} +1.13.0 --- End diff -- Is it OK to use old version? Does Drill support semver API compatibility for UDFs? If yes, how is it enforced? If no, compilation may fail. ---
[GitHub] drill pull request #1225: DRILL-6272: Refactor dynamic UDFs and function ini...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1225#discussion_r185353418 --- Diff: exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java --- @@ -177,7 +177,7 @@ public void run() { } } - //@Test --- End diff -- What is the reason the test was disabled before? ---
[GitHub] drill pull request #1225: DRILL-6272: Refactor dynamic UDFs and function ini...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1225#discussion_r185374827 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java --- @@ -164,121 +166,113 @@ public void testResolveTemporaryTableWithPartialSchema() throws Exception { @Test public void testPartitionByWithTemporaryTables() throws Exception { String temporaryTableName = "temporary_table_with_partitions"; -mockRandomUUID(UUID.nameUUIDFromBytes(temporaryTableName.getBytes())); +cleanSessionDirectory(); test("create TEMPORARY table %s partition by (c1) as select * from (" + "select 'A' as c1 from (values(1)) union all select 'B' as c1 from (values(1))) t", temporaryTableName); -checkPermission(temporaryTableName); +checkPermission(); } - @Test(expected = UserRemoteException.class) + @Test public void testCreationOutsideOfDefaultTemporaryWorkspace() throws Exception { -try { - String temporaryTableName = "temporary_table_outside_of_default_workspace"; - test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", temp2_schema, temporaryTableName); -} catch (UserRemoteException e) { - assertThat(e.getMessage(), containsString(String.format( - "VALIDATION ERROR: Temporary tables are not allowed to be created / dropped " + - "outside of default temporary workspace [%s].", DFS_TMP_SCHEMA))); - throw e; -} +String temporaryTableName = "temporary_table_outside_of_default_workspace"; + +thrown.expect(UserRemoteException.class); +thrown.expectMessage(containsString(String.format( +"VALIDATION ERROR: Temporary tables are not allowed to be created / dropped " + +"outside of default temporary workspace [%s].", DFS_TMP_SCHEMA))); + +test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", temp2_schema, temporaryTableName); } - @Test(expected = UserRemoteException.class) + @Test public void testCreateWhenTemporaryTableExistsWithoutSchema() throws Exception { String temporaryTableName = "temporary_table_exists_without_schema"; -try { - test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); - test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); -} catch (UserRemoteException e) { - assertThat(e.getMessage(), containsString(String.format( - "VALIDATION ERROR: A table or view with given name [%s]" + - " already exists in schema [%s]", temporaryTableName, DFS_TMP_SCHEMA))); - throw e; -} + +thrown.expect(UserRemoteException.class); +thrown.expectMessage(containsString(String.format( +"VALIDATION ERROR: A table or view with given name [%s]" + +" already exists in schema [%s]", temporaryTableName, DFS_TMP_SCHEMA))); + +test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); +test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); } - @Test(expected = UserRemoteException.class) + @Test public void testCreateWhenTemporaryTableExistsCaseInsensitive() throws Exception { String temporaryTableName = "temporary_table_exists_without_schema"; -try { - test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); - test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName.toUpperCase()); -} catch (UserRemoteException e) { - assertThat(e.getMessage(), containsString(String.format( - "VALIDATION ERROR: A table or view with given name [%s]" + - " already exists in schema [%s]", temporaryTableName.toUpperCase(), DFS_TMP_SCHEMA))); - throw e; -} + +thrown.expect(UserRemoteException.class); +thrown.expectMessage(containsString(String.format( +"VALIDATION ERROR: A table or view with given name [%s]" + --- End diff -- and possibly `expectUserRemoteExceptionWithTableExistsMessage(String tableName, String schemaName)`. ---
[GitHub] drill pull request #1225: DRILL-6272: Refactor dynamic UDFs and function ini...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1225#discussion_r185375540 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java --- @@ -498,47 +489,50 @@ public void testDropTemporaryTableAsViewWithoutException() throws Exception { .go(); } - @Test(expected = UserRemoteException.class) + @Test public void testDropTemporaryTableAsViewWithException() throws Exception { String temporaryTableName = "temporary_table_to_drop_like_view_with_exception"; test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); -try { - test("drop view %s.%s", DFS_TMP_SCHEMA, temporaryTableName); -} catch (UserRemoteException e) { - assertThat(e.getMessage(), containsString(String.format( - "VALIDATION ERROR: Unknown view [%s] in schema [%s]", temporaryTableName, DFS_TMP_SCHEMA))); - throw e; +thrown.expect(UserRemoteException.class); +thrown.expectMessage(containsString(String.format( +"VALIDATION ERROR: Unknown view [%s] in schema [%s]", temporaryTableName, DFS_TMP_SCHEMA))); + +test("drop view %s.%s", DFS_TMP_SCHEMA, temporaryTableName); + } + + private static String getSessionId() throws Exception { --- End diff -- Consider mocking getSessionId() in the `UserSession`. This method needs to be tested by itself. ---
[GitHub] drill pull request #1225: DRILL-6272: Refactor dynamic UDFs and function ini...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1225#discussion_r185369981 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java --- @@ -125,7 +125,7 @@ public void testHasPathThrowsDrillRuntimeException() { Mockito .when(client.getCache().getCurrentData(absPath)) -.thenThrow(Exception.class); +.thenThrow(RuntimeException.class); --- End diff -- OK, but I am not sure what does this method test. `ZookeeperClient.hasPath(String path)` is not used in production. ---
[GitHub] drill pull request #1225: DRILL-6272: Refactor dynamic UDFs and function ini...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1225#discussion_r185374205 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java --- @@ -164,121 +166,113 @@ public void testResolveTemporaryTableWithPartialSchema() throws Exception { @Test public void testPartitionByWithTemporaryTables() throws Exception { String temporaryTableName = "temporary_table_with_partitions"; -mockRandomUUID(UUID.nameUUIDFromBytes(temporaryTableName.getBytes())); +cleanSessionDirectory(); test("create TEMPORARY table %s partition by (c1) as select * from (" + "select 'A' as c1 from (values(1)) union all select 'B' as c1 from (values(1))) t", temporaryTableName); -checkPermission(temporaryTableName); +checkPermission(); } - @Test(expected = UserRemoteException.class) + @Test public void testCreationOutsideOfDefaultTemporaryWorkspace() throws Exception { -try { - String temporaryTableName = "temporary_table_outside_of_default_workspace"; - test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", temp2_schema, temporaryTableName); -} catch (UserRemoteException e) { - assertThat(e.getMessage(), containsString(String.format( - "VALIDATION ERROR: Temporary tables are not allowed to be created / dropped " + - "outside of default temporary workspace [%s].", DFS_TMP_SCHEMA))); - throw e; -} +String temporaryTableName = "temporary_table_outside_of_default_workspace"; + +thrown.expect(UserRemoteException.class); --- End diff -- Consider introducing a new method to set `thrown` and message, something like `void expectUserRemoteExceptionWithMessage(String message)`. ---
[GitHub] drill pull request #1224: DRILL-6321: Customize Drill's conformance. Allow s...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1224#discussion_r185351651 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillConformance.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql; + +import org.apache.calcite.sql.validate.SqlConformanceEnum; +import org.apache.calcite.sql.validate.SqlDelegatingConformance; + +/** + * Drill's SQL conformance is SqlConformanceEnum.DEFAULT except for method isApplyAllowed(). + * Since Drill is going to allow OUTER APPLY and CROSS APPLY to allow each row from left child of Join + * to join with output of right side (sub-query or table function that will be invoked for each row). + * Refer to DRILL-5999 for more information. + */ +public class DrillConformance extends SqlDelegatingConformance { --- End diff -- Why not to introduce top-level class when needed. To override the behavior of the single method an anonymous class is more than sufficient. ---
[GitHub] drill pull request #1224: DRILL-6321: Customize Drill's conformance. Allow s...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1224#discussion_r185225979 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillConformance.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql; + +import org.apache.calcite.sql.validate.SqlConformanceEnum; +import org.apache.calcite.sql.validate.SqlDelegatingConformance; + +/** + * Drill's SQL conformance is SqlConformanceEnum.DEFAULT except for method isApplyAllowed(). + * Since Drill is going to allow OUTER APPLY and CROSS APPLY to allow each row from left child of Join + * to join with output of right side (sub-query or table function that will be invoked for each row). + * Refer to DRILL-5999 for more information. + */ +public class DrillConformance extends SqlDelegatingConformance { --- End diff -- If changes are introduced to `DrillConformance`, it can be refactored later to be a top-level class. For now, I suggest avoid optimizing for future requirements that may never materialize. ---
[GitHub] drill issue #1236: DRILL-6347: Inconsistent method name "field".
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1236 LGTM. Please squash commits. ---
[GitHub] drill issue #1235: DRILL-6336: Inconsistent method name.
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1235 My take is that "append" is more common for classes with the similar functionality, see for example `ToStringBuilder`. As there is no added benefit of using "print" vs "append", my recommendation is to keep "append" as is and see if `DebugStringBuilder` can be replaced with the `ToStringBuilder`. ---
[GitHub] drill issue #1237: DRILL-6348: Fixed code so that Unordered Receiver reports...
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1237 IMO, it will be good to understand what other operators do as well. For example what Project or Filter operators do. Do they take ownership of incoming batches? And if they do, when is the ownership taken? I do not suggest that we change how Sender and Receiver control **all** aspects of communication, at least not as part of this JIRA/PR. The difference in my and your approach is whether or not UnorderedReceiver and other receivers are pass-through operators. My view is that receivers are not pass-through operators and they are buffering operators as they receive batches from the network and buffer them before downstream operators are ready to consume those batches. In your view, receivers are pass-through operators that get batches from fragment queue or some other queue and pass them to downstream. As there is no wait and no processing between getting a batch from fragment queue and passing it to the next operator, I don't see why a receiver needs to take the ownership. ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184804819 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java --- @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException { } } + private RawFragmentBatch getNextNotEmptyBatch() throws IOException { +RawFragmentBatch batch; +try { + stats.startWait(); --- End diff -- it may throw `AssertException` now and other exceptions may be added in the future. ---
[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1238#discussion_r184724657 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java --- @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.drill.common.exceptions.UserException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion. + * TODO: look at switching to fork join. + * @param The time value that will be returned when the task is executed. + */ +public abstract class TimedCallable implements Callable { + private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class); + + private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000; + + private volatile long startTime = 0; + private volatile long executionTime = -1; + + private static class FutureMapper implements Function<Future, V> { +int count; +Throwable throwable = null; + +private void setThrowable(Throwable t) { + if (throwable == null) { +throwable = t; + } else { +throwable.addSuppressed(t); + } +} + +@Override +public V apply(Future future) { + Preconditions.checkState(future.isDone()); + if (!future.isCancelled()) { +try { + count++; + return future.get(); +} catch (InterruptedException e) { + // there is no wait as we are getting result from the completed/done future + logger.error("Unexpected exception", e); + throw UserException.internalError(e) + .message("Unexpected exception") + .build(logger); +} catch (ExecutionException e) { + setThrowable(e.getCause()); +} + } else { +setThrowable(new CancellationException()); + } + return null; +} + } + + private static class Statistics implements Consumer<TimedCallable> { +final long start = System.nanoTime(); +final Stopwatch watch = Stopwatch.createStarted(); +long totalExecution; +long maxExecution; +int count; +int startedCount; +private int doneCount; +// measure thread creation times +long earliestStart; +long latestStart; +long totalStart; + +@Override +public void accept(TimedCallable task) { + count++; + long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start; + if (threadStart >= 0) { +startedCount++; +earliestStart = Math.min(earliestStart, threadStart); +latestStart = Math.max(latestStart, threadStart); +totalStart += threadStart; +long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS); +if (executionTime != -1) { + done
[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1238#discussion_r184694930 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java --- @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.drill.common.exceptions.UserException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion. + * TODO: look at switching to fork join. + * @param The time value that will be returned when the task is executed. + */ +public abstract class TimedCallable implements Callable { + private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class); + + private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000; + + private volatile long startTime = 0; + private volatile long executionTime = -1; + + private static class FutureMapper implements Function<Future, V> { +int count; +Throwable throwable = null; + +private void setThrowable(Throwable t) { + if (throwable == null) { +throwable = t; + } else { +throwable.addSuppressed(t); + } +} + +@Override +public V apply(Future future) { + Preconditions.checkState(future.isDone()); + if (!future.isCancelled()) { +try { + count++; + return future.get(); +} catch (InterruptedException e) { + // there is no wait as we are getting result from the completed/done future + logger.error("Unexpected exception", e); + throw UserException.internalError(e) + .message("Unexpected exception") + .build(logger); +} catch (ExecutionException e) { + setThrowable(e.getCause()); +} + } else { +setThrowable(new CancellationException()); + } + return null; +} + } + + private static class Statistics implements Consumer<TimedCallable> { +final long start = System.nanoTime(); +final Stopwatch watch = Stopwatch.createStarted(); +long totalExecution; +long maxExecution; +int count; +int startedCount; +private int doneCount; +// measure thread creation times +long earliestStart; +long latestStart; +long totalStart; + +@Override +public void accept(TimedCallable task) { + count++; + long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start; + if (threadStart >= 0) { +startedCount++; +earliestStart = Math.min(earliestStart, threadStart); +latestStart = Math.max(latestStart, threadStart); +totalStart += threadStart; +long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS); +if (executionTime != -1) { + done
[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1238#discussion_r184691926 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.drill.common.exceptions.UserException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion. + * TODO: look at switching to fork join. + * @param The time value that will be returned when the task is executed. + */ +public abstract class TimedCallable implements Callable { + private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class); + + private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000; + + private volatile long startTime = 0; + private volatile long executionTime = -1; + + private static class FutureMapper implements Function<Future, V> { +int count; +Throwable throwable = null; + +private void setThrowable(Throwable t) { + if (throwable == null) { +throwable = t; + } else { +throwable.addSuppressed(t); + } +} + +@Override +public V apply(Future future) { + Preconditions.checkState(future.isDone()); + if (!future.isCancelled()) { +try { + count++; + return future.get(); +} catch (InterruptedException e) { + // there is no wait as we are getting result from the completed/done future + logger.error("Unexpected exception", e); + throw UserException.internalError(e) + .message("Unexpected exception") + .build(logger); +} catch (ExecutionException e) { + setThrowable(e.getCause()); +} + } else { +setThrowable(new CancellationException()); + } + return null; +} + } + + private static class Statistics implements Consumer<TimedCallable> { +final long start = System.nanoTime(); +final Stopwatch watch = Stopwatch.createStarted(); +long totalExecution = 0; +long maxExecution = 0; +int startedCount = 0; +private int doneCount = 0; +// measure thread creation times +long earliestStart = Long.MAX_VALUE; +long latestStart = 0; +long totalStart = 0; + +@Override +public void accept(TimedCallable task) { + long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start; + if (threadStart >= 0) { +startedCount++; +earliestStart = Math.min(earliestStart, threadStart); +latestStart = Math.max(latestStart, threadStart); +totalStart += threadStart; +long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS); +if (executionTime != -1) { + done
[GitHub] drill issue #1214: DRILL-6331: Revisit Hive Drill native parquet implementat...
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1214 When moving files around please preserve the history of modifications done to the file. ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184596895 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java --- @@ -77,4 +83,46 @@ public long getByteCount() { public boolean isAckSent() { return ackSent.get(); } + + /** + * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory + * accounting (that is, the operator should be charged with the body's Drillbuf memory). + * + * NOTES - + * + * This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the + * owning allocator or b) the target allocator is already the owner + * When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper + * DrillBuf reference count accounting + * The RPC handling code caches a reference to this RawFragmentBatch object instance; release() + * calls should be routed to the previous DrillBuf + * + * + * @param targetAllocator target allocator + * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has + * been switched to the target allocator); otherwise this operation is a NOOP (current instance + * returned) + */ + public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) { +if (body == null) { + return this; // NOOP +} + +if (!body.getLedger().isOwningLedger() + || body.getLedger().isOwner(targetAllocator)) { + + return this; +} + +int writerIndex = body.writerIndex(); +TransferResult transferResult = body.transferOwnership(targetAllocator); + +// Set the index and increment reference count +transferResult.buffer.writerIndex(writerIndex); + +// Clear the current Drillbuffer since caller will perform release() on the new one +body.release(); + +return new RawFragmentBatch(getHeader(), transferResult.buffer, getSender(), false); --- End diff -- This actually brings a question why `newRawFragmentBatch` is released in `IncomingBuffers.batchArrived()` instead of releasing `transferredBuffer` after `RawFragmentBatch` is constructed in `newRawFragmentBatch`. ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184590436 --- Diff: exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java --- @@ -253,10 +261,12 @@ public boolean transferBalance(final BufferLedger target) { target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name); } -boolean overlimit = target.allocator.forceAllocate(size); +// Release first to handle the case where the current and target allocators were part of the same +// parent / child tree. allocator.releaseBytes(size); +boolean allocationFit = target.allocator.forceAllocate(size); --- End diff -- In this case, changing the order of `forceAllocate()` and `releaseBytes()` is incorrect as ownership is not changed, but the old owner does not account for that memory anymore. ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184589826 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java --- @@ -201,6 +208,11 @@ public IterOutcome next() { context.getExecutorState().fail(ex); return IterOutcome.STOP; } finally { + + if (batch != null) { +batch.release(); +batch = null; --- End diff -- OK, but note that additions to the current code can also delete `batch = null` assignment and that `batch` can be used after `release()` call. For example, with the current implementation, `batch.getHeader()` is perfectly valid after the batch was released. ---
[GitHub] drill issue #1237: DRILL-6348: Fixed code so that Unordered Receiver reports...
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1237 IMO, it is better not to report memory usage at all compared to reporting a wrong number. In case incoming batches are accumulated in a queue, they should be reported as owned by a receiver. Taking ownership just before passing a batch to the next operator does not sound right to me. I don't think it is necessary to create new fragment child allocator. Receiver allocator should be used instead of fragment allocator when an incoming batch is placed into a queue. ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184586222 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java --- @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException { } } + private RawFragmentBatch getNextNotEmptyBatch() throws IOException { --- End diff -- No, there is no need to handle `IOException` twice. There are no other methods that throw `IOException`. `SchemaChangeException` is actually never thrown. Please see *TODO* comment and DRILL-2933. ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184585959 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java --- @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException { } } + private RawFragmentBatch getNextNotEmptyBatch() throws IOException { +RawFragmentBatch batch; +try { + stats.startWait(); + batch = getNextBatch(); + + // skip over empty batches. we do this since these are basically control messages. + while (batch != null && batch.getHeader().getDef().getRecordCount() == 0 --- End diff -- Please explain why it can't be released. Note that the release is done for the batch that will *not* be returned from the method. ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184585775 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java --- @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException { } } + private RawFragmentBatch getNextNotEmptyBatch() throws IOException { +RawFragmentBatch batch; +try { + stats.startWait(); --- End diff -- Yes, it does matter. If something goes wrong during `startWait()` there should be no call to `stopWait()`. ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184554299 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java --- @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException { } } + private RawFragmentBatch getNextNotEmptyBatch() throws IOException { --- End diff -- Consider handling `IOException` inside `getNextNotEmptyBatch()`. ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184558425 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java --- @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException { } } + private RawFragmentBatch getNextNotEmptyBatch() throws IOException { +RawFragmentBatch batch; +try { + stats.startWait(); + batch = getNextBatch(); + + // skip over empty batches. we do this since these are basically control messages. + while (batch != null && batch.getHeader().getDef().getRecordCount() == 0 --- End diff -- Consider reverting the condition, something like ``` while (true) { RawFragmentBatch batch = getNextBatch(); if (batch == null) { break; } RecordBatchDef recordBatchDef = batch.getHeader().getDef(); if (recordBatchDef.getRecordCount() > 0 || (first && recordBatchDef.getFieldCount() > 0)) { return batch; } batch.release(); } ``` ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184554436 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java --- @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException { } } + private RawFragmentBatch getNextNotEmptyBatch() throws IOException { +RawFragmentBatch batch; +try { + stats.startWait(); --- End diff -- Move outside of try/finally. ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184559429 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java --- @@ -201,6 +208,11 @@ public IterOutcome next() { context.getExecutorState().fail(ex); return IterOutcome.STOP; } finally { + + if (batch != null) { +batch.release(); +batch = null; --- End diff -- As far as I can see setting `batch` to `null` has no impact here. ---
[GitHub] drill pull request #1225: DRILL-6272: Refactor dynamic UDFs and function ini...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1225#discussion_r184411700 --- Diff: exec/java-exec/pom.xml --- @@ -593,6 +593,48 @@ netty-tcnative ${netty.tcnative.classifier} + + org.apache.maven + maven-embedder + 3.3.9 --- End diff -- Consider using the latest release available. ---
[GitHub] drill pull request #1225: DRILL-6272: Refactor dynamic UDFs and function ini...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1225#discussion_r184245358 --- Diff: pom.xml --- @@ -798,7 +798,7 @@ com.googlecode.jmockit jmockit - 1.3 + 1.7 --- End diff -- Can it be done as a precursor PR? 1.7 version is quite old too. Can it be upgraded to the latest (org.jmockit:jmockit:1.39)? ---
[GitHub] drill issue #1238: DRILL-6281: Refactor TimedRunnable
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1238 There is not enough info available to debug and/or troubleshoot DRILL-5908 and I prefer instead of trying to find bugs in homegrown solution replace it with Java out of the box functionality and at the same time provide an ability to log enough information to do RCA for DRILL-5908. IMO, there are no unreasonable requests on PR/JIRA 😄. ---
[GitHub] drill issue #1238: DRILL-6281: Refactor TimedRunnable
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1238 I did not change how tasks (`Runnable` or `Callable`) behave and did not look into converting `Callable/Runnable` to a `ForkJoinTask`. Whether existing tasks can be scheduled recursively or not depends on the nature of those tasks and is not the scope of this PR. I'd suggest filing a JIRA if one does not exist already to look into `Fork/Join` (this is what I would expect from the developer who put "TODO"). ---
[GitHub] drill issue #1238: DRILL-6281: Refactor TimedRunnable
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1238 The step is necessary to do RCA for DRILL-5908. There are way too many issues with the current implementation to list them in JIRA or PR and the major issue is the usage of homegrown solutions where Java (or other 3rd party libraries) already provides a required functionality out of the box. There is no need to use `Runnable` instead of `Callable` and provide custom `Callable` functionality. It is not necessary to wait on a `CountDownLatch` when `ExecutionService` provides the ability to invoke all tasks and return results when all tasks complete or a timeout expires. ---
[GitHub] drill issue #1189: DRILL-6282: Update Drill's Metrics dependencies
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1189 LGTM ---
[GitHub] drill pull request #1189: DRILL-6282: Update Drill's Metrics dependencies
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1189#discussion_r184243564 --- Diff: logical/pom.xml --- @@ -85,14 +85,12 @@ - com.codahale.metrics + io.dropwizard.metrics --- End diff -- It is not a best practice to rely on a transitive dependency for a compile scope dependency. I'd recommend specifying the dependency on `io.dropwizard.metrics` explicitly in case `java-exec` or `drill-memory-base` uses it. ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184159218 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java --- @@ -153,8 +153,10 @@ private RawFragmentBatch getNextBatch() throws IOException { public IterOutcome next() { batchLoader.resetRecordCount(); stats.startProcessing(); + +RawFragmentBatch batch = null; try{ - RawFragmentBatch batch; + --- End diff -- create function `getNextNotEmptyBatch()` that calls `getNextBatch()` and either returns not empty batch or `null`. ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184156922 --- Diff: exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java --- @@ -253,10 +261,12 @@ public boolean transferBalance(final BufferLedger target) { target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name); } -boolean overlimit = target.allocator.forceAllocate(size); +// Release first to handle the case where the current and target allocators were part of the same +// parent / child tree. allocator.releaseBytes(size); +boolean allocationFit = target.allocator.forceAllocate(size); --- End diff -- If this happens, is not there a problem that the old allocator already released the memory? In any case, won't runtime exception cancel the query anyway and all allocators will be closed. ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184155724 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java --- @@ -77,4 +83,46 @@ public long getByteCount() { public boolean isAckSent() { return ackSent.get(); } + + /** + * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory + * accounting (that is, the operator should be charged with the body's Drillbuf memory). + * + * NOTES - + * + * This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the + * owning allocator or b) the target allocator is already the owner + * When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper + * DrillBuf reference count accounting + * The RPC handling code caches a reference to this RawFragmentBatch object instance; release() + * calls should be routed to the previous DrillBuf + * + * + * @param targetAllocator target allocator + * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has + * been switched to the target allocator); otherwise this operation is a NOOP (current instance + * returned) + */ + public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) { +if (body == null) { + return this; // NOOP +} + +if (!body.getLedger().isOwningLedger() + || body.getLedger().isOwner(targetAllocator)) { + + return this; +} + +int writerIndex = body.writerIndex(); +TransferResult transferResult = body.transferOwnership(targetAllocator); --- End diff -- But what if it is an over limit? ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184154997 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java --- @@ -77,4 +83,46 @@ public long getByteCount() { public boolean isAckSent() { return ackSent.get(); } + + /** + * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory + * accounting (that is, the operator should be charged with the body's Drillbuf memory). + * + * NOTES - + * + * This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the + * owning allocator or b) the target allocator is already the owner + * When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper + * DrillBuf reference count accounting + * The RPC handling code caches a reference to this RawFragmentBatch object instance; release() + * calls should be routed to the previous DrillBuf + * + * + * @param targetAllocator target allocator + * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has + * been switched to the target allocator); otherwise this operation is a NOOP (current instance + * returned) + */ + public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) { +if (body == null) { + return this; // NOOP +} + +if (!body.getLedger().isOwningLedger() + || body.getLedger().isOwner(targetAllocator)) { + + return this; +} + +int writerIndex = body.writerIndex(); +TransferResult transferResult = body.transferOwnership(targetAllocator); + +// Set the index and increment reference count +transferResult.buffer.writerIndex(writerIndex); + +// Clear the current Drillbuffer since caller will perform release() on the new one +body.release(); + +return new RawFragmentBatch(getHeader(), transferResult.buffer, getSender(), false); --- End diff -- I don't see where RawFragmentBatch is cached. Is not it removed from a queue using poll()? ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184146733 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java --- @@ -182,13 +184,18 @@ public IterOutcome next() { return IterOutcome.OUT_OF_MEMORY; } + // Transfer the ownership of this raw-batch to this operator for proper memory statistics reporting + batch = batch.transferBodyOwnership(oContext.getAllocator()); + final RecordBatchDef rbd = batch.getHeader().getDef(); final boolean schemaChanged = batchLoader.load(rbd, batch.getBody()); // TODO: Clean: DRILL-2933: That load(...) no longer throws // SchemaChangeException, so check/clean catch clause below. stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount()); batch.release(); + batch = null; --- End diff -- Why can't it be done in finally? ---
[GitHub] drill issue #1189: DRILL-6282: Excluding io.dropwizard.metrics dependencies
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1189 Please update JIRA, PR and commit titles and squash commits. ---
[GitHub] drill pull request #1189: DRILL-6282: Excluding io.dropwizard.metrics depend...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1189#discussion_r184144853 --- Diff: pom.xml --- @@ -1333,6 +1353,12 @@ + --- End diff -- I am not sure why is it necessary to have `com.yammer.metrics` in dependencyManagement? ---
[GitHub] drill pull request #1189: DRILL-6282: Excluding io.dropwizard.metrics depend...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1189#discussion_r184144021 --- Diff: logical/pom.xml --- @@ -85,14 +85,12 @@ - com.codahale.metrics + io.dropwizard.metrics --- End diff -- Is it used in logical? ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184050305 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java --- @@ -182,13 +184,18 @@ public IterOutcome next() { return IterOutcome.OUT_OF_MEMORY; } + // Transfer the ownership of this raw-batch to this operator for proper memory statistics reporting + batch = batch.transferBodyOwnership(oContext.getAllocator()); + final RecordBatchDef rbd = batch.getHeader().getDef(); final boolean schemaChanged = batchLoader.load(rbd, batch.getBody()); // TODO: Clean: DRILL-2933: That load(...) no longer throws // SchemaChangeException, so check/clean catch clause below. stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount()); batch.release(); + batch = null; --- End diff -- Can it be now handled in `finally`? ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184085544 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java --- @@ -153,8 +153,10 @@ private RawFragmentBatch getNextBatch() throws IOException { public IterOutcome next() { batchLoader.resetRecordCount(); stats.startProcessing(); + +RawFragmentBatch batch = null; try{ - RawFragmentBatch batch; + --- End diff -- Consider moving try block to a separate function. ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184049558 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java --- @@ -201,6 +208,11 @@ public IterOutcome next() { context.getExecutorState().fail(ex); return IterOutcome.STOP; } finally { + + if (batch != null) { +batch.release(); +batch = null; --- End diff -- Why is this necessary? ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184070218 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java --- @@ -77,4 +83,46 @@ public long getByteCount() { public boolean isAckSent() { return ackSent.get(); } + + /** + * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory + * accounting (that is, the operator should be charged with the body's Drillbuf memory). + * + * NOTES - + * + * This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the + * owning allocator or b) the target allocator is already the owner + * When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper + * DrillBuf reference count accounting + * The RPC handling code caches a reference to this RawFragmentBatch object instance; release() + * calls should be routed to the previous DrillBuf + * + * + * @param targetAllocator target allocator + * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has + * been switched to the target allocator); otherwise this operation is a NOOP (current instance + * returned) + */ + public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) { +if (body == null) { + return this; // NOOP +} + +if (!body.getLedger().isOwningLedger() + || body.getLedger().isOwner(targetAllocator)) { + + return this; +} + +int writerIndex = body.writerIndex(); +TransferResult transferResult = body.transferOwnership(targetAllocator); + +// Set the index and increment reference count +transferResult.buffer.writerIndex(writerIndex); + +// Clear the current Drillbuffer since caller will perform release() on the new one +body.release(); + +return new RawFragmentBatch(getHeader(), transferResult.buffer, getSender(), false); --- End diff -- Why is it necessary to return new `RawFragmentBatch` instead of setting `body` to `transferResult.buffer`? ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184084128 --- Diff: exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java --- @@ -253,10 +261,12 @@ public boolean transferBalance(final BufferLedger target) { target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name); } -boolean overlimit = target.allocator.forceAllocate(size); +// Release first to handle the case where the current and target allocators were part of the same +// parent / child tree. allocator.releaseBytes(size); +boolean allocationFit = target.allocator.forceAllocate(size); --- End diff -- move `owningLedger = target;` before `return target.allocator.forceAllocate(size)` as there is no check for the result of the call anyway. ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184075517 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java --- @@ -77,4 +83,46 @@ public long getByteCount() { public boolean isAckSent() { return ackSent.get(); } + + /** + * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory + * accounting (that is, the operator should be charged with the body's Drillbuf memory). + * + * NOTES - + * + * This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the + * owning allocator or b) the target allocator is already the owner + * When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper + * DrillBuf reference count accounting + * The RPC handling code caches a reference to this RawFragmentBatch object instance; release() + * calls should be routed to the previous DrillBuf + * + * + * @param targetAllocator target allocator + * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has + * been switched to the target allocator); otherwise this operation is a NOOP (current instance + * returned) + */ + public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) { +if (body == null) { + return this; // NOOP +} + +if (!body.getLedger().isOwningLedger() + || body.getLedger().isOwner(targetAllocator)) { + + return this; +} + +int writerIndex = body.writerIndex(); +TransferResult transferResult = body.transferOwnership(targetAllocator); --- End diff -- There is no check that the transfer is within new allocator limits, is it guaranteed to succeed? ---
[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1237#discussion_r184068798 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java --- @@ -153,8 +153,10 @@ private RawFragmentBatch getNextBatch() throws IOException { public IterOutcome next() { batchLoader.resetRecordCount(); stats.startProcessing(); + +RawFragmentBatch batch = null; --- End diff -- Is it necessary to release batch when it is overridden on line 167? ---
[GitHub] drill issue #1234: DRILL-5927: Fixed memory leak in TestBsonRecordReader, an...
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1234 LGTM ---
[GitHub] drill pull request #1189: DRILL-6282: Excluding io.dropwizard.metrics depend...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1189#discussion_r183870299 --- Diff: pom.xml --- @@ -1333,6 +1353,12 @@ + --- End diff -- Where is the dependency used? Can it be replaced with the `io.dropwizard.metrics`? ---
[GitHub] drill pull request #1189: DRILL-6282: Excluding io.dropwizard.metrics depend...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1189#discussion_r183868650 --- Diff: pom.xml --- @@ -1164,7 +1164,27 @@ io.dropwizard.metrics metrics-core -4.0.2 +4.1.0-rc0 --- End diff -- define version as a property and avoid using `rc` version unless it provides a significant benefit. ---
[GitHub] drill pull request #1234: DRILL-5927: Fixed memory leak in TestBsonRecordRea...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1234#discussion_r183865249 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java --- @@ -45,21 +46,24 @@ import org.bson.BsonTimestamp; import org.bson.BsonWriter; import org.bson.types.ObjectId; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; import org.junit.Test; -public class TestBsonRecordReader extends BaseTestQuery { - private static VectorContainerWriter writer; - private static TestOutputMutator mutator; - private static BsonRecordReader bsonReader; +public class TestBsonRecordReader { + private BufferAllocator allocator; + private VectorContainerWriter writer; + private TestOutputMutator mutator; + private DrillBuf buffer; + private BsonRecordReader bsonReader; - @BeforeClass - public static void setUp() { -BufferAllocator bufferAllocator = getDrillbitContext().getAllocator(); -mutator = new TestOutputMutator(bufferAllocator); + @Before + public void setUp() { +allocator = new RootAllocator(400_000); --- End diff -- Two minor suggestions: - use `RootAllocator` with an unlimited amount of memory as limiting it to 400K does not test `BsonRecordReader` and depends more on how Vectors are allocated. - instead of working with `DrillBuf` directly, use `BufferManager`. ---
[GitHub] drill pull request #1234: DRILL-5927: Fixed memory leak in TestBsonRecordRea...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1234#discussion_r183779660 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java --- @@ -49,17 +50,20 @@ import org.junit.BeforeClass; import org.junit.Test; -public class TestBsonRecordReader extends BaseTestQuery { +public class TestBsonRecordReader { + private static BufferAllocator allocator; private static VectorContainerWriter writer; private static TestOutputMutator mutator; + private static DrillBuf buffer; private static BsonRecordReader bsonReader; @BeforeClass public static void setUp() { -BufferAllocator bufferAllocator = getDrillbitContext().getAllocator(); -mutator = new TestOutputMutator(bufferAllocator); +allocator = new RootAllocator(9_000_00); --- End diff -- It does not sound right as there is not much data used in the test. I'd suggest filing a JIRA to look into why it takes 900 KB or almost 1 MB to hold data and using unlimited allocator (the same as before). ---
[GitHub] drill pull request #1234: DRILL-5927: Fixed memory leak in TestBsonRecordRea...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1234#discussion_r183747252 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java --- @@ -272,6 +276,9 @@ public static void cleanUp() { } catch (Exception e) { } + +buffer.close(); --- End diff -- IMO `close()` should be used within try-with-resources only (no explicit calls to `close()`). ---
[GitHub] drill pull request #1234: DRILL-5927: Fixed memory leak in TestBsonRecordRea...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1234#discussion_r183741516 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java --- @@ -49,17 +50,20 @@ import org.junit.BeforeClass; import org.junit.Test; -public class TestBsonRecordReader extends BaseTestQuery { +public class TestBsonRecordReader { + private static BufferAllocator allocator; --- End diff -- What is a reason `allocator` and other variables are initialized in `@BeforeClass`? Can they be initialized in `@Before` instead? ---
[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1238#discussion_r183578626 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java --- @@ -66,8 +69,8 @@ private static void checkMagicBytes(FileStatus status, byte[] data, int offset) } public static List getFooters(final Configuration conf, List statuses, int parallelism) throws IOException { -final List<TimedRunnable> readers = Lists.newArrayList(); -List foundFooters = Lists.newArrayList(); +final List<TimedCallable> readers = new ArrayList<>(); +final List foundFooters = new ArrayList<>(); --- End diff -- Please see Guava Java doc [Lists.newArrayList()](http://google.github.io/guava/releases/snapshot/api/docs/com/google/common/collect/Lists.html#newArrayList--): > **Note for Java 7 and later:** this method is now unnecessary and should be treated as deprecated. Instead, use the ArrayList [constructor](https://docs.oracle.com/javase/9/docs/api/java/util/ArrayList.html?is-external=true#ArrayList--) directly, taking advantage of the new ["diamond" syntax](http://goo.gl/iz2Wi). ---
[GitHub] drill pull request #1144: DRILL-6202: Deprecate usage of IndexOutOfBoundsExc...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1144#discussion_r183571786 --- Diff: src/main/resources/checkstyle-config.xml --- @@ -30,10 +30,15 @@ + --- End diff -- IMO the same applies to all run-time exceptions (NPE, CCE, IOBE) and likely conversion for NPE is the same as for IOBE. Note that most likely the root cause of an IOBE or NPE is a bug and not an end user error, misconfiguration, invalid input or a lack of resources, so end user won't be able to "fix" NPE or IOBE. ---
[GitHub] drill pull request #1234: DRILL-5927: Fixed memory leak in TestBsonRecordRea...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1234#discussion_r183564004 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java --- @@ -49,17 +50,20 @@ import org.junit.BeforeClass; import org.junit.Test; -public class TestBsonRecordReader extends BaseTestQuery { +public class TestBsonRecordReader { + private static BufferAllocator allocator; --- End diff -- What is a reason to define `BufferAllocator` and other fields as static? What if tests are executed in parallel? ---
[GitHub] drill pull request #1234: DRILL-5927: Fixed memory leak in TestBsonRecordRea...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1234#discussion_r183562766 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java --- @@ -272,6 +276,9 @@ public static void cleanUp() { } catch (Exception e) { } + +buffer.close(); --- End diff -- buffer.release()? ---
[GitHub] drill pull request #1234: DRILL-5927: Fixed memory leak in TestBsonRecordRea...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1234#discussion_r183563218 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java --- @@ -49,17 +50,20 @@ import org.junit.BeforeClass; import org.junit.Test; -public class TestBsonRecordReader extends BaseTestQuery { +public class TestBsonRecordReader { + private static BufferAllocator allocator; private static VectorContainerWriter writer; private static TestOutputMutator mutator; + private static DrillBuf buffer; private static BsonRecordReader bsonReader; @BeforeClass public static void setUp() { -BufferAllocator bufferAllocator = getDrillbitContext().getAllocator(); -mutator = new TestOutputMutator(bufferAllocator); +allocator = new RootAllocator(100_000_000); --- End diff -- Is it necessary to reserve 100MB for allocator? Does it allocate more than 1K? ---
[GitHub] drill issue #1235: DRILL-6336: Inconsistent method name.
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1235 There is no need to expose implementation details as part of the class API. Whether `DebugStringBuilder` uses `PrintWriter.print()` or something else to implement `append()` must be hidden from `DebugStringBuilder` consumers. Method name should never depend on details of implementation as the implementation may change, but API should not. ---
[GitHub] drill issue #1236: DRILL-6347: Inconsistent method name "field".
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1236 IMO, neither `append` or `appendField` is a good choice (otherwise it is necessary to change `startNode/endNode` to `appendStart/EndNode`). It is either `field` or `visitField` and should follow visitor design pattern. It may be also good to change `listField` to the same name used for `Boolean` and `String`. ---
[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1238#discussion_r183397526 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.drill.common.exceptions.UserException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion. + * TODO: look at switching to fork join. + * @param The time value that will be returned when the task is executed. + */ +public abstract class TimedCallable implements Callable { + private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class); + + private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000; + + private volatile long startTime = 0; + private volatile long executionTime = -1; + + private static class FutureMapper implements Function<Future, V> { +int count; +Throwable throwable = null; + +private void setThrowable(Throwable t) { + if (throwable == null) { +throwable = t; + } else { +throwable.addSuppressed(t); + } +} + +@Override +public V apply(Future future) { + Preconditions.checkState(future.isDone()); + if (!future.isCancelled()) { +try { + count++; + return future.get(); +} catch (InterruptedException e) { + // there is no wait as we are getting result from the completed/done future + logger.error("Unexpected exception", e); + throw UserException.internalError(e) + .message("Unexpected exception") + .build(logger); +} catch (ExecutionException e) { + setThrowable(e.getCause()); +} + } else { +setThrowable(new CancellationException()); + } + return null; +} + } + + private static class Statistics implements Consumer<TimedCallable> { +final long start = System.nanoTime(); +final Stopwatch watch = Stopwatch.createStarted(); +long totalExecution = 0; +long maxExecution = 0; +int startedCount = 0; +private int doneCount = 0; +// measure thread creation times +long earliestStart = Long.MAX_VALUE; +long latestStart = 0; +long totalStart = 0; + +@Override +public void accept(TimedCallable task) { + long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start; + if (threadStart >= 0) { +startedCount++; +earliestStart = Math.min(earliestStart, threadStart); +latestStart = Math.max(latestStart, threadStart); +totalStart += threadStart; +long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS); +if (executionTime != -1) { + done
[GitHub] drill issue #1238: DRILL-6281: Refactor TimedRunnable
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1238 @parthchandra Please review. Note to a committer: please do *not* squash commits. ---
[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable
GitHub user vrozov opened a pull request: https://github.com/apache/drill/pull/1238 DRILL-6281: Refactor TimedRunnable You can merge this pull request into a Git repository by running: $ git pull https://github.com/vrozov/drill DRILL-6281 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/1238.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1238 commit c523734a563c62f58ea1e7161ad366777ba62035 Author: Vlad Rozov <vrozov@...> Date: 2018-04-21T01:00:20Z DRILL-6281: Refactor TimedRunnable (rename TimedRunnable to TimedCallable) commit 63a483b393450e95d09911d756caf670f0a1fdb6 Author: Vlad Rozov <vrozov@...> Date: 2018-04-21T15:07:42Z DRILL-6281: Refactor TimedRunnable ---
[GitHub] drill issue #1225: DRILL-6272: Refactor dynamic UDFs and function initialize...
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1225 @arina-ielchiieva I don't see why using [maven embedder](http://maven.apache.org/ref/3.5.2/maven-embedder) is a less preferable option. Using maven it is possible to create source jars using standard maven plugin. IMO, it will be easier to modify test UDF project if necessary and it is still possible to debug and run tests using IDE or maven. There will be no dependency on a build tool, it is a dependency on an external jar similar to any other external jar dependency. ---
[GitHub] drill issue #1213: DRILL-6334: Minor code cleanup
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1213 LGTM ---
[GitHub] drill pull request #1225: DRILL-6272: Refactor dynamic UDFs and function ini...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1225#discussion_r182902596 --- Diff: exec/java-exec/src/test/resources/udf/dynamic/CustomAbsFunctionTemplate --- @@ -0,0 +1,45 @@ +package org.apache.drill.udf.dynamic; --- End diff -- Apache license ---
[GitHub] drill pull request #1225: DRILL-6272: Refactor dynamic UDFs and function ini...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1225#discussion_r182902723 --- Diff: exec/java-exec/src/test/resources/udf/dynamic/CustomAbsFunctionTemplate --- @@ -0,0 +1,45 @@ +package org.apache.drill.udf.dynamic; + +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.expr.DrillSimpleFunc; +import org.apache.drill.exec.expr.annotations.FunctionTemplate; +import org.apache.drill.exec.expr.annotations.Output; +import org.apache.drill.exec.expr.annotations.Param; +import org.apache.drill.exec.expr.holders.VarCharHolder; + +import javax.inject.Inject; + +@FunctionTemplate( +name="abs", +scope= FunctionTemplate.FunctionScope.SIMPLE, +nulls = FunctionTemplate.NullHandling.NULL_IF_NULL +) +public class Abs implements DrillSimpleFunc { + + @Param + VarCharHolder input1; + + @Param + VarCharHolder input2; + + @Output + VarCharHolder out; + + @Inject + DrillBuf buffer; + + public void setup() { + + } + + public void eval() { +String inputString1 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input1.start, input1.end, input1.buffer); +String inputString2 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input2.start, input2.end, input2.buffer); +String outputValue = String.format("ABS was overloaded. Input: %s, %s", inputString1, inputString2); + +out.buffer = buffer; +out.start = 0; +out.end = outputValue.getBytes().length; +buffer.setBytes(0, outputValue.getBytes()); + } +} --- End diff -- LF ---
[GitHub] drill pull request #1225: DRILL-6272: Refactor dynamic UDFs and function ini...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1225#discussion_r182902184 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/FunctionInitializerTest.java --- @@ -53,17 +56,27 @@ @Category(SqlFunctionTest.class) public class FunctionInitializerTest { - private static final String CLASS_NAME = "com.drill.udf.CustomLowerFunction"; + private static final String CLASS_NAME = "org.apache.drill.udf.dynamic.CustomLowerFunction"; private static URLClassLoader classLoader; + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + @BeforeClass public static void init() throws Exception { -Path jars = TestTools.WORKING_PATH - .resolve(TestTools.TEST_RESOURCES) - .resolve("jars"); -String binaryName = "DrillUDF-1.0.jar"; -String sourceName = JarUtil.getSourceName(binaryName); -URL[] urls = {jars.resolve(binaryName).toUri().toURL(), jars.resolve(sourceName).toUri().toURL()}; +String binaryName = "DrillUDF-1.0"; +URL template = ClassLoader.getSystemClassLoader().getResource("udf/dynamic/CustomLowerFunctionTemplate"); +assert template != null; --- End diff -- use junit assert in unit tests. ---
[GitHub] drill pull request #1224: DRILL-6321: Customize Drill's conformance. Allow s...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1224#discussion_r182874214 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillConformance.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql; + +import org.apache.calcite.sql.validate.SqlConformanceEnum; +import org.apache.calcite.sql.validate.SqlDelegatingConformance; + +/** + * Drill's SQL conformance is SqlConformanceEnum.DEFAULT except for method isApplyAllowed(). + * Since Drill is going to allow OUTER APPLY and CROSS APPLY to allow each row from left child of Join + * to join with output of right side (sub-query or table function that will be invoked for each row). + * Refer to DRILL-5999 for more information. + */ +public class DrillConformance extends SqlDelegatingConformance { --- End diff -- Is it necessary to have `DrillConformance` as a top level public class? Consider using an anonymous class directly in `DrillParserConfig`. ---
[GitHub] drill pull request #1213: DRILL-6334: Minor code cleanup
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1213#discussion_r182590683 --- Diff: exec/vector/src/test/java/org/apache/drill/exec/vector/VariableLengthVectorTest.java --- @@ -37,6 +37,7 @@ public void testSettingSameValueCount() { try (RootAllocator allocator = new RootAllocator(10_000_000)) { final MaterializedField field = MaterializedField.create("stringCol", Types.required(TypeProtos.MinorType.VARCHAR)); + @SuppressWarnings("resource") --- End diff -- @paul-rogers Thank you for making the changes. Can you please double check whether it is necessary to suppress the warning here. It is confusing why the same warning suppression is removed in some places and added in others. ---
[GitHub] drill pull request #1213: Minor code cleanup
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1213#discussion_r182445689 --- Diff: exec/vector/src/test/java/org/apache/drill/exec/vector/VariableLengthVectorTest.java --- @@ -1,4 +1,4 @@ -/** +/* --- End diff -- The same as the other license header ---
[GitHub] drill pull request #1213: Minor code cleanup
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1213#discussion_r182440952 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java --- @@ -1,4 +1,4 @@ -/** +/* --- End diff -- All license headers should be already fixed as part of @ilooner PR #1207 or #1215. ---
[GitHub] drill pull request #1213: Minor code cleanup
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1213#discussion_r182445146 --- Diff: exec/vector/src/main/java/org/apache/drill/exec/exception/OversizedAllocationException.java --- @@ -27,6 +27,7 @@ * {@link RecordBatch#next() iteration}. * */ +@SuppressWarnings("serial") --- End diff -- Will it be better to provide `serialVersionUUID` instead of warning suppression? ---
[GitHub] drill pull request #1213: Minor code cleanup
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1213#discussion_r182444596 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java --- @@ -33,6 +33,7 @@ public void list() throws Exception { BufferAllocator allocator = RootAllocatorFactory.newRoot(DrillConfig.create()); TestOutputMutator output = new TestOutputMutator(allocator); +@SuppressWarnings("resource") --- End diff -- Why is it necessary to suppress resource not being closed warning here and OK to remove warning suppression in other places? ---
[GitHub] drill pull request #1213: Minor code cleanup
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1213#discussion_r182445595 --- Diff: exec/vector/src/main/java/org/apache/drill/exec/util/CallBack.java --- @@ -1,4 +1,4 @@ -/** +/* --- End diff -- The same as the other license header ---
[GitHub] drill issue #1193: can drill support minio s3 storage
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1193 Can one of PMCs request INFRA to close the PR? ---
[GitHub] drill issue #1196: DRILL-6286: Fixed incorrect reference to shutdown in dril...
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1196 +1 ---
[GitHub] drill pull request #1215: DRILL-6338: Do not skip license maven plugin when ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1215#discussion_r182259451 --- Diff: protocol/pom.xml --- @@ -149,8 +152,12 @@ com.mycila license-maven-plugin + --- End diff -- Do no force ``. It should be already `false`. ---
[GitHub] drill issue #1208: DRILL-6295: PartitionerDecorator may close partitioners w...
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1208 @parthchandra Please review ---
[GitHub] drill pull request #1213: Minor code cleanup
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1213#discussion_r181932078 --- Diff: exec/vector/src/main/java/org/apache/drill/exec/exception/OversizedAllocationException.java --- @@ -1,4 +1,4 @@ -/** +/* --- End diff -- Should be already handled by Tim's PR #1207. ---
[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1208#discussion_r181927070 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java --- @@ -262,68 +280,122 @@ public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged) { } @Override -public void execute(Partitioner part) throws IOException { +public void execute(Partitioner part) throws IOException, InterruptedException { part.flushOutgoingBatches(isLastBatch, schemaChanged); } } /** - * Helper class to wrap Runnable with customized naming - * Exception handling + * Helper class to wrap Runnable with cancellation and waiting for completion support * */ - private static class CustomRunnable implements Runnable { + private static class PartitionerTask implements Runnable { + +private enum STATE { + NEW, + COMPLETING, + NORMAL, + EXCEPTIONAL, + CANCELLED, + INTERRUPTING, + INTERRUPTED +} + +private final AtomicReference state; +private final AtomicReference runner; +private final PartitionerDecorator partitionerDecorator; +private final AtomicInteger count; -private final String parentThreadName; -private final CountDownLatch latch; private final GeneralExecuteIface iface; -private final Partitioner part; +private final Partitioner partitioner; private CountDownLatchInjection testCountDownLatch; -private volatile IOException exp; +private volatile ExecutionException exception; -public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface, -final Partitioner part, CountDownLatchInjection testCountDownLatch) { - this.parentThreadName = parentThreadName; - this.latch = latch; +public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch) { + state = new AtomicReference<>(STATE.NEW); + runner = new AtomicReference<>(); + this.partitionerDecorator = partitionerDecorator; this.iface = iface; - this.part = part; + this.partitioner = partitioner; + this.count = count; this.testCountDownLatch = testCountDownLatch; } @Override public void run() { - // Test only - Pause until interrupted by fragment thread - try { -testCountDownLatch.await(); - } catch (final InterruptedException e) { -logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e); - } - - final Thread currThread = Thread.currentThread(); - final String currThreadName = currThread.getName(); - final OperatorStats localStats = part.getStats(); - try { -final String newThreadName = parentThreadName + currThread.getId(); -currThread.setName(newThreadName); + final Thread thread = Thread.currentThread(); + if (runner.compareAndSet(null, thread)) { +final String name = thread.getName(); +thread.setName(String.format("Partitioner-%s-%d", partitionerDecorator.thread.getName(), thread.getId())); +final OperatorStats localStats = partitioner.getStats(); localStats.clear(); localStats.startProcessing(); -iface.execute(part); - } catch (IOException e) { -exp = e; - } finally { -localStats.stopProcessing(); -currThread.setName(currThreadName); -latch.countDown(); +ExecutionException executionException = null; +try { + // Test only - Pause until interrupted by fragment thread + testCountDownLatch.await(); + if (state.get() == STATE.NEW) { +iface.execute(partitioner); + } +} catch (InterruptedException e) { + if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) { +logger.warn("Partitioner Task interrupted during the run", e); + } +} catch (Throwable t) { + executionException = new ExecutionException(t); +} finally { + if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) { +if (executionException == null) { + localStats.stopProcessing(); + state.lazySet(STATE.NORMAL); +} else { +
[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1208#discussion_r181926928 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java --- @@ -161,8 +161,11 @@ public OperatorStats getStats() { * @param schemaChanged true if the schema has changed */ @Override - public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException { + public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException, InterruptedException { for (OutgoingRecordBatch batch : outgoingBatches) { + if (Thread.interrupted()) { +throw new InterruptedException(); --- End diff -- I'll revert back throwing 'InterruptedException' to avoid mishandling of the last batch. ---
[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1208#discussion_r181865979 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java --- @@ -118,105 +127,114 @@ public PartitionOutgoingBatch getOutgoingBatches(int index) { return null; } - @VisibleForTesting - protected List getPartitioners() { + List getPartitioners() { return partitioners; } /** * Helper to execute the different methods wrapped into same logic * @param iface - * @throws IOException + * @throws ExecutionException */ - protected void executeMethodLogic(final GeneralExecuteIface iface) throws IOException { -if (partitioners.size() == 1 ) { - // no need for threads - final OperatorStats localStatsSingle = partitioners.get(0).getStats(); - localStatsSingle.clear(); - localStatsSingle.startProcessing(); + @VisibleForTesting + void executeMethodLogic(final GeneralExecuteIface iface) throws ExecutionException { +// To simulate interruption of main fragment thread and interrupting the partition threads, create a +// CountDownInject latch. Partitioner threads await on the latch and main fragment thread counts down or +// interrupts waiting threads. This makes sure that we are actually interrupting the blocked partitioner threads. +try (CountDownLatchInjection testCountDownLatch = injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch")) { --- End diff -- The `testCountDownLatch` is used only for testing and initialized to 1. The wait is on `count`. ---
[GitHub] drill issue #1207: DRILL-6320: Fixed License Headers
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1207 LGTM, @ilooner please double check that Apache source distribution can be built with '-Drat.skip=false -Dlicense.skip=false': - build with `-P apache.release -Dgpg.skip=true` - extract created source .tar.gz or .zip to a temp directory - build with apache-rat and license check enabled (you may need to add more files to the license check exclusion). ---
[GitHub] drill pull request #1207: DRILL-6320: Fixed License Headers
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1207#discussion_r181434241 --- Diff: pom.xml --- @@ -198,6 +200,78 @@ + +org.apache.rat +apache-rat-plugin +0.12 + + --- End diff -- I refer to `validate`. ---
[GitHub] drill pull request #1207: DRILL-6320: Fixed License Headers
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1207#discussion_r181267547 --- Diff: pom.xml --- @@ -198,6 +200,78 @@ + +org.apache.rat +apache-rat-plugin +0.12 + + +rat-checks +validate + + check + + + + + ${rat.skip} --- End diff -- try with `${rat.skip}` removed. Keep `true` in properties. ---
[GitHub] drill issue #1207: DRILL-6320: Fixed License Headers
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1207 @arina-ielchiieva In Apex we use special git user Apex Dev `d...@apex.apache.org` to make format only changes. I'd recommend doing the same for Drill. ---
[GitHub] drill pull request #1207: DRILL-6320: Fixed License Headers
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1207#discussion_r181197514 --- Diff: pom.xml --- @@ -198,6 +200,78 @@ + +org.apache.rat +apache-rat-plugin +0.12 + + +rat-checks +validate + + check + + + + + ${rat.skip} --- End diff -- Is this necessary? Should not `` use `rat.skip` property by default? ---
[GitHub] drill pull request #1207: DRILL-6320: Fixed License Headers
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1207#discussion_r181198668 --- Diff: pom.xml --- @@ -375,12 +449,12 @@ - com.mycila license-maven-plugin 3.0 + ${license.skip} --- End diff -- Is the `skip` necessary? ---
[GitHub] drill pull request #1207: DRILL-6320: Fixed License Headers
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1207#discussion_r181196340 --- Diff: pom.xml --- @@ -198,6 +200,78 @@ + +org.apache.rat +apache-rat-plugin +0.12 --- End diff -- Do not specify the version explicitly, the one from the parent Apache pom should be used. ---
[GitHub] drill pull request #1207: DRILL-6320: Fixed License Headers
Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1207#discussion_r181197234 --- Diff: pom.xml --- @@ -198,6 +200,78 @@ + +org.apache.rat +apache-rat-plugin +0.12 + + --- End diff -- Is this necessary? Should not `apache-rat:check` be already bound to the `validate` phase? ---
[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...
GitHub user vrozov opened a pull request: https://github.com/apache/drill/pull/1208 DRILL-6295: PartitionerDecorator may close partitioners while CustomRunnable are active during query cancellation @ilooner Please review You can merge this pull request into a Git repository by running: $ git pull https://github.com/vrozov/drill DRILL-6295 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/1208.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1208 commit 20c917461536d14bc752c3085fad8799a107f6cc Author: Vlad Rozov <vrozov@...> Date: 2018-04-11T17:12:07Z DRILL-6295: PartitionerDecorator may close partitioners while CustomRunnable are active during query cancellation ---
[GitHub] drill issue #1193: can drill support minio s3 storage
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1193 @rongfengliang If you have a question regarding Drill ask it on `u...@drill.apache.org`. If you have something to contribute, please follow contributions guidelines to open a PR. ---
[GitHub] drill issue #1207: DRILL-6320: Fixed License Headers
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1207 -1 on removing apache-rat maven plugin. It is an Apache plugin to invoke "Release Audit Tool". More plugins can be added to check license headers if they do a better job in addition to apache-rat. Both apache-rat and any other license verification plugins can be disabled by default (this will allow to run `mvn -T 2C clean install` if necessary) and only enabled in Travis CI. ---
[GitHub] drill issue #1202: DRILL-6311: No logging information in drillbit.log / dril...
Github user vrozov commented on the issue: https://github.com/apache/drill/pull/1202 LGTM +1. ---