[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-17 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203102601
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+public NoTaskLocationException(String message)
+{
+  super(message);
+}
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+public TaskNotRunnableException(String message)
+{
+  super(message);
+}
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new 
EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+  HttpClient httpClient,
+  ObjectMapper objectMapper,
+  TaskInfoProvider taskInfoProvider,
+  Duration httpTimeout,
+  String callerId,
+  int numThreads,
+  long numRetries
+  )
+  {
+this.httpClient = httpClient;
+this.objectMapper = objectMapper;
+this.taskInfoProvider = taskInfoProvider;
+this.httpTimeout = httpTimeout;
+this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+this.executorService = MoreExecutors.listeningDecorator(
+Execs.multiThreaded(
+numThreads,
+StringUtils.format(
+"IndexTaskClient-%s-%%d",
+callerId
+)
+)
+);
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long 
numRetries)
+  {
+// Retries [numRetries] times before giving up; this should be set long 
enough to handle any temporary
+// unresponsiveness such as network issues, if 

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-17 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203102303
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+public NoTaskLocationException(String message)
+{
+  super(message);
+}
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+public TaskNotRunnableException(String message)
+{
+  super(message);
+}
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new 
EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+  HttpClient httpClient,
+  ObjectMapper objectMapper,
+  TaskInfoProvider taskInfoProvider,
+  Duration httpTimeout,
+  String callerId,
+  int numThreads,
+  long numRetries
+  )
+  {
+this.httpClient = httpClient;
+this.objectMapper = objectMapper;
+this.taskInfoProvider = taskInfoProvider;
+this.httpTimeout = httpTimeout;
+this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+this.executorService = MoreExecutors.listeningDecorator(
+Execs.multiThreaded(
+numThreads,
+StringUtils.format(
+"IndexTaskClient-%s-%%d",
+callerId
+)
+)
+);
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long 
numRetries)
+  {
+// Retries [numRetries] times before giving up; this should be set long 
enough to handle any temporary
+// unresponsiveness such as network issues, if 

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-17 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203102022
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
 
 Review comment:
   This is one of the comments I'd definitely like more clarity on before 
merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-17 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203102106
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+public NoTaskLocationException(String message)
+{
+  super(message);
+}
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+public TaskNotRunnableException(String message)
+{
+  super(message);
+}
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new 
EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+  HttpClient httpClient,
+  ObjectMapper objectMapper,
+  TaskInfoProvider taskInfoProvider,
+  Duration httpTimeout,
+  String callerId,
+  int numThreads,
+  long numRetries
+  )
+  {
+this.httpClient = httpClient;
+this.objectMapper = objectMapper;
+this.taskInfoProvider = taskInfoProvider;
+this.httpTimeout = httpTimeout;
+this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+this.executorService = MoreExecutors.listeningDecorator(
+Execs.multiThreaded(
+numThreads,
+StringUtils.format(
+"IndexTaskClient-%s-%%d",
+callerId
+)
+)
+);
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long 
numRetries)
+  {
+// Retries [numRetries] times before giving up; this should be set long 
enough to handle any temporary
+// unresponsiveness such as network issues, if 

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-17 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203101307
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
 ##
 @@ -180,18 +113,17 @@ public boolean resume(final String id)
 
   if (response.getStatus().equals(HttpResponseStatus.OK)) {
 log.info("Task [%s] paused successfully", id);
-return jsonMapper.readValue(response.getContent(), new 
TypeReference>()
+return deserialize(response.getContent(), new 
TypeReference>()
 
 Review comment:
   This and a few other comments are to see if the type contract can be 
hardened a little bit. having type references with deeply nested types always 
gets me worried for future changes where a change in a field type in a class 
has unexpected knock on effects. When those fields are explicit in a class 
somewhere, or the class is effectively an alias for the type, then it helps 
prevent unexpected changes from showing up in odd places in the code.
   
   It is easier to find `KafkaIndexCheckpointMap` than `Map` 
when doing code grepping
   
   ((this suggestion is optional and more of a discussion so I can better 
understand the limitations of where these values are intended to be used))


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-17 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203100365
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
 ##
 @@ -21,80 +21,32 @@
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.indexer.TaskLocation;
-import io.druid.indexer.TaskStatus;
-import io.druid.indexing.common.RetryPolicy;
-import io.druid.indexing.common.RetryPolicyConfig;
-import io.druid.indexing.common.RetryPolicyFactory;
+import io.druid.indexing.common.IndexTaskClient;
 import io.druid.indexing.common.TaskInfoProvider;
-import io.druid.java.util.common.IAE;
-import io.druid.java.util.common.IOE;
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.StringUtils;
-import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.jackson.JacksonUtils;
 import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.java.util.http.client.HttpClient;
-import io.druid.java.util.http.client.Request;
-import io.druid.java.util.http.client.response.FullResponseHandler;
 import io.druid.java.util.http.client.response.FullResponseHolder;
-import io.druid.segment.realtime.firehose.ChatHandlerResource;
-import org.jboss.netty.channel.ChannelException;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
-import org.joda.time.Period;
 
-import javax.ws.rs.core.MediaType;
+import javax.annotation.Nullable;
 import java.io.IOException;
-import java.net.Socket;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.concurrent.Callable;
 
-public class KafkaIndexTaskClient
+public class KafkaIndexTaskClient extends IndexTaskClient
 {
-  public static class NoTaskLocationException extends RuntimeException
-  {
-public NoTaskLocationException(String message)
-{
-  super(message);
-}
-  }
-
-  public static class TaskNotRunnableException extends RuntimeException
-  {
-public TaskNotRunnableException(String message)
-{
-  super(message);
-}
-  }
-
-  public static final int MAX_RETRY_WAIT_SECONDS = 10;
-
-  private static final int MIN_RETRY_WAIT_SECONDS = 2;
   private static final EmittingLogger log = new 
EmittingLogger(KafkaIndexTaskClient.class);
-  private static final String BASE_PATH = "/druid/worker/v1/chat";
-  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
-  private static final TreeMap EMPTY_TREE_MAP = new TreeMap();
-
-  private final HttpClient httpClient;
-  private final ObjectMapper jsonMapper;
-  private final TaskInfoProvider taskInfoProvider;
-  private final Duration httpTimeout;
-  private final RetryPolicyFactory retryPolicyFactory;
-  private final ListeningExecutorService executorService;
-  private final long numRetries;
+  private static final TreeMap> EMPTY_TREE_MAP = 
new TreeMap<>();
 
 Review comment:
   ok, probably not worth the risk then


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-17 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203099526
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java
 ##
 @@ -0,0 +1,502 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.client.indexing.TaskStatusResponse;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Responsible for submitting tasks, monitoring task statuses, resubmitting 
failed tasks, and returning the final task
+ * status.
+ */
+public class TaskMonitor
+{
+  private static final Logger log = new Logger(TaskMonitor.class);
+
+  private final ScheduledExecutorService taskStatusChecker = 
Execs.scheduledSingleThreaded(("task-monitor-%d"));
+
+  /**
+   * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state 
of running {@link SubTaskSpec}s. This is
+   * read in {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and updated in {@link #submit}
+   * and {@link #retry}. This can also be read by calling {@link 
#getRunningTaskMonitorEntory},
+   * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}.
+   */
+  private final ConcurrentMap runningTasks = new 
ConcurrentHashMap<>();
+
+  /**
+   * A map of subTaskSpecId to {@link TaskHistory}. This map stores the 
history of complete {@link SubTaskSpec}s
+   * whether their final state is succeeded or failed. This is updated in 
{@link MonitorEntry#setLastStatus} which is
+   * called by the {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and can be
+   * read by outside of this class.
+   */
+  private final ConcurrentMap> taskHistories = new 
ConcurrentHashMap<>();
+
+  // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks
+  private final Object taskCountLock = new Object();
+
+  // overlord client
+  private final IndexingServiceClient indexingServiceClient;
+  private final int maxRetry;
+  private final int expectedNumSucceededTasks;
+
+  private int numRunningTasks;
+  private int numSucceededTasks;
+  private int numFailedTasks;
+
+  private volatile boolean running = false;
+
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int 
expectedNumSucceededTasks)
+  {
+this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
+this.maxRetry = maxRetry;
+this.expectedNumSucceededTasks = expectedNumSucceededTasks;
+
+log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", 
expectedNumSucceededTasks);
+  }
+
+  public void start(long taskStatusCheckingPeriod)
+  {
+running = true;
+log.info("Starting taskMonitor");
+// NOTE: This polling can be improved to event-driven pushing by 
registering TaskRunnerListener to TaskRunner.
+// That listener should be able to send the events reported to TaskRunner 
to this TaskMonitor.
+taskStatusChecker.scheduleAtFixedRate(
+() -> {
+  try {
+final Iterator> iterator = 
runningTasks.entrySet().iterator();
+while (it

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201788708
 
 

 ##
 File path: 
indexing-service/src/test/java/io/druid/indexing/common/task/TaskMonitorTest.java
 ##
 @@ -0,0 +1,220 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.client.indexing.NoopIndexingServiceClient;
+import io.druid.client.indexing.TaskStatusResponse;
+import io.druid.data.input.InputSplit;
+import io.druid.indexer.RunnerTaskState;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent;
+import io.druid.indexing.common.task.TaskMonitor.TaskHistory;
+import io.druid.java.util.common.DateTimes;
+import io.druid.java.util.common.concurrent.Execs;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class TaskMonitorTest
 
 Review comment:
   Is there any way to add concurrency tests here? there seem to be a lot of 
potentially shaky concurrency items in the monitor, and if they can be tested 
that could help shake out bugs early.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201777132
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java
 ##
 @@ -0,0 +1,624 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.FiniteFirehoseFactory;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputSplit;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.indexing.common.task.TaskMonitor.MonitorEntry;
+import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent;
+import io.druid.indexing.common.task.TaskMonitor.TaskHistory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import io.druid.segment.realtime.appenderator.UsedSegmentChecker;
+import io.druid.segment.realtime.firehose.ChatHandler;
+import io.druid.segment.realtime.firehose.ChatHandlerProvider;
+import io.druid.segment.realtime.firehose.ChatHandlers;
+import io.druid.server.security.Action;
+import io.druid.server.security.AuthorizerMapper;
+import io.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * An implementation of {@link ParallelIndexTaskRunner} to support best-effort 
roll-up. This runner can submit and
+ * monitor multiple {@link ParallelIndexSubTask}s.
+ *
+ * As its name indicates, distributed indexing is done in a single phase, 
i.e., without shuffling intermediate data. As
+ * a result, this task can't be used for perfect rollup.
+ */
+public class SinglePhaseParallelIndexTaskRunner implements 
ParallelIndexTaskRunner, ChatHandler
+{
+  private static final Logger log = new 
Logger(SinglePhaseParallelIndexTaskRunner.class);
+
+  private final String taskId;
+  private final String groupId;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final Map context;
+  private final FiniteFirehoseFactory baseFirehoseFactory;
+  private final int maxNumTasks;
+  private final IndexingServiceClient indexingServiceClient;
+  private final ChatHandlerProvider chatHandlerProvider;
+  private final AuthorizerMapper authorizerMapper;
+
+  private final BlockingQueue> 
taskCompleteEvents =
+  new LinkedBlockingDeque<>();
+
+  // subTa

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201779862
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java
 ##
 @@ -0,0 +1,502 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.client.indexing.TaskStatusResponse;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Responsible for submitting tasks, monitoring task statuses, resubmitting 
failed tasks, and returning the final task
+ * status.
+ */
+public class TaskMonitor
+{
+  private static final Logger log = new Logger(TaskMonitor.class);
+
+  private final ScheduledExecutorService taskStatusChecker = 
Execs.scheduledSingleThreaded(("task-monitor-%d"));
+
+  /**
+   * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state 
of running {@link SubTaskSpec}s. This is
+   * read in {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and updated in {@link #submit}
+   * and {@link #retry}. This can also be read by calling {@link 
#getRunningTaskMonitorEntory},
+   * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}.
+   */
+  private final ConcurrentMap runningTasks = new 
ConcurrentHashMap<>();
+
+  /**
+   * A map of subTaskSpecId to {@link TaskHistory}. This map stores the 
history of complete {@link SubTaskSpec}s
+   * whether their final state is succeeded or failed. This is updated in 
{@link MonitorEntry#setLastStatus} which is
+   * called by the {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and can be
+   * read by outside of this class.
+   */
+  private final ConcurrentMap> taskHistories = new 
ConcurrentHashMap<>();
+
+  // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks
+  private final Object taskCountLock = new Object();
+
+  // overlord client
+  private final IndexingServiceClient indexingServiceClient;
+  private final int maxRetry;
+  private final int expectedNumSucceededTasks;
+
+  private int numRunningTasks;
+  private int numSucceededTasks;
+  private int numFailedTasks;
+
+  private volatile boolean running = false;
+
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int 
expectedNumSucceededTasks)
+  {
+this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
+this.maxRetry = maxRetry;
+this.expectedNumSucceededTasks = expectedNumSucceededTasks;
+
+log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", 
expectedNumSucceededTasks);
+  }
+
+  public void start(long taskStatusCheckingPeriod)
+  {
+running = true;
+log.info("Starting taskMonitor");
+// NOTE: This polling can be improved to event-driven pushing by 
registering TaskRunnerListener to TaskRunner.
+// That listener should be able to send the events reported to TaskRunner 
to this TaskMonitor.
+taskStatusChecker.scheduleAtFixedRate(
+() -> {
+  try {
+final Iterator> iterator = 
runningTasks.entrySet().iterator();
 
 Review comment:
   

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201773312
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexTaskClient.java
 ##
 @@ -0,0 +1,80 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.druid.indexing.common.IndexTaskClient;
+import io.druid.indexing.common.TaskInfoProvider;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.Duration;
+
+import java.io.IOException;
+import java.util.List;
+
+public class ParallelIndexTaskClient extends IndexTaskClient
+{
+  private final String subtaskId;
+
+  public ParallelIndexTaskClient(
 
 Review comment:
   Suggest making this package private so that someone doesn't put a non-Smile 
object mapper in the constructor


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201773728
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexTaskRunner.java
 ##
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.indexing.common.task;
+
+import io.druid.indexer.TaskState;
+import io.druid.indexing.common.TaskToolbox;
+
+/**
+ * ParallelIndexTaskRunner is the actual task runner of {@link 
ParallelIndexSupervisorTask}. There is currently a single
+ * implementation, i.e. {@link SinglePhaseParallelIndexTaskRunner} which 
supports only best-effort roll-up. We can add
+ * more implementations in the future.
+ */
+public interface ParallelIndexTaskRunner
+{
+  TaskState run(TaskToolbox toolbox) throws Exception;
 
 Review comment:
   This is not terribly descriptive. Can you explain the expectations of the 
`run` method, especially for future implementations vs current implementation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201763005
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/Counters.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.google.common.util.concurrent.AtomicDouble;
 
 Review comment:
   Did you mean to get the google common one?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201762816
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/appenderator/SegmentAllocateActionGenerator.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.indexing.appenderator;
+
+import io.druid.data.input.InputRow;
+import io.druid.indexing.common.actions.TaskAction;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+
+public interface SegmentAllocateActionGenerator
+{
+  TaskAction generate(
 
 Review comment:
   Can you add java docs about what this method is supposed to do, and what 
implementation guarantees should exist?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201772643
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSupervisorTask.java
 ##
 @@ -0,0 +1,322 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.FiniteFirehoseFactory;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.stats.RowIngestionMetersFactory;
+import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
+import io.druid.indexing.common.task.IndexTask.IndexTuningConfig;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.realtime.firehose.ChatHandler;
+import io.druid.segment.realtime.firehose.ChatHandlerProvider;
+import io.druid.server.security.AuthorizerMapper;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+
+/**
+ * ParallelIndexSupervisorTask is capable of running multiple subTasks for 
parallel indexing. This is
+ * applicable if the input {@link FiniteFirehoseFactory} is splittable. While 
this task is running, it can submit
+ * multiple child tasks to overlords. This task succeeds only when all its 
child tasks succeed; otherwise it fails.
+ *
+ * @see ParallelIndexTaskRunner
+ */
+public class ParallelIndexSupervisorTask extends AbstractTask implements 
ChatHandler
+{
+  static final String TYPE = "index_parallel";
+
+  private static final Logger log = new 
Logger(ParallelIndexSupervisorTask.class);
+
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final FiniteFirehoseFactory baseFirehoseFactory;
+  private final IndexingServiceClient indexingServiceClient;
+  private final ChatHandlerProvider chatHandlerProvider;
+  private final AuthorizerMapper authorizerMapper;
+  private final RowIngestionMetersFactory rowIngestionMetersFactory;
+
+  private ParallelIndexTaskRunner runner;
+
+  @JsonCreator
+  public ParallelIndexSupervisorTask(
+  @JsonProperty("id") String id,
+  @JsonProperty("resource") TaskResource taskResource,
+  @JsonProperty("spec") ParallelIndexIngestionSpec ingestionSchema,
+  @JsonProperty("context") Map context,
+  @JacksonInject @Nullable IndexingServiceClient indexingServiceClient, // 
null in overlords
+  @JacksonInject @Nullable ChatHandlerProvider chatHandlerProvider, // 
null in overlords
+  @JacksonInject AuthorizerMapper authorizerMapper,
+  @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
+  )
+  {
+super(
+getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+null,
+taskResource,
+ingestionSchema.getDataSchema().getDataSource(),
+context
+);
+
+this.ingestionSchema = ingestionSchema;
+
+final FirehoseFactory firehoseFactory = 
ingestionSchema.getIOConfig().getFirehoseFactory();
+if (!(firehoseFactory instanceof FiniteFirehoseFactory)) {
+  throw new IAE("[%s] should implement FiniteFirehoseFactory", 
firehoseFactory.getClass().getSimpleName());
+}
+
+this.baseFirehoseFactory = (FiniteFirehoseFactory) firehoseFactory;
+this.indexingServiceClient = indexingServiceClient;
+this.chatHandlerProvider = chatHandlerProvider;
+this.authorizerMapper = authorizerMapper;
+this.rowIngestionMetersFactory = rowIngestionMetersFactory;
+
+if (ingestionSchema.getTuningConfig().getMaxSavedParseExceptions() > 0) {
+ 

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201763889
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+public NoTaskLocationException(String message)
+{
+  super(message);
+}
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+public TaskNotRunnableException(String message)
+{
+  super(message);
+}
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new 
EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+  HttpClient httpClient,
+  ObjectMapper objectMapper,
+  TaskInfoProvider taskInfoProvider,
+  Duration httpTimeout,
+  String callerId,
+  int numThreads,
+  long numRetries
+  )
+  {
+this.httpClient = httpClient;
+this.objectMapper = objectMapper;
+this.taskInfoProvider = taskInfoProvider;
+this.httpTimeout = httpTimeout;
+this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+this.executorService = MoreExecutors.listeningDecorator(
+Execs.multiThreaded(
+numThreads,
+StringUtils.format(
+"IndexTaskClient-%s-%%d",
+callerId
+)
+)
+);
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long 
numRetries)
+  {
+// Retries [numRetries] times before giving up; this should be set long 
enough to handle any temporary
+// unresponsiveness such as network issues, if 

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201760844
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
 ##
 @@ -312,10 +237,13 @@ public DateTime getStartTime(final String id)
   {
 log.debug("GetCheckpoints task[%s] retry[%s]", id, retry);
 try {
-  final FullResponseHolder response = submitRequest(id, HttpMethod.GET, 
"checkpoints", null, retry);
-  return jsonMapper.readValue(response.getContent(), new 
TypeReference>>()
-  {
-  });
+  final FullResponseHolder response = submitRequestWithEmptyContent(id, 
HttpMethod.GET, "checkpoints", null, retry);
+  return deserialize(
+  response.getContent(),
+  new TypeReference>>()
 
 Review comment:
   this type reference seems to be used a couple of places, is it better as a 
class? or as a JacksonUtils?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201759752
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
 ##
 @@ -21,80 +21,32 @@
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.indexer.TaskLocation;
-import io.druid.indexer.TaskStatus;
-import io.druid.indexing.common.RetryPolicy;
-import io.druid.indexing.common.RetryPolicyConfig;
-import io.druid.indexing.common.RetryPolicyFactory;
+import io.druid.indexing.common.IndexTaskClient;
 import io.druid.indexing.common.TaskInfoProvider;
-import io.druid.java.util.common.IAE;
-import io.druid.java.util.common.IOE;
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.StringUtils;
-import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.jackson.JacksonUtils;
 import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.java.util.http.client.HttpClient;
-import io.druid.java.util.http.client.Request;
-import io.druid.java.util.http.client.response.FullResponseHandler;
 import io.druid.java.util.http.client.response.FullResponseHolder;
-import io.druid.segment.realtime.firehose.ChatHandlerResource;
-import org.jboss.netty.channel.ChannelException;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
-import org.joda.time.Period;
 
-import javax.ws.rs.core.MediaType;
+import javax.annotation.Nullable;
 import java.io.IOException;
-import java.net.Socket;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.concurrent.Callable;
 
-public class KafkaIndexTaskClient
+public class KafkaIndexTaskClient extends IndexTaskClient
 {
-  public static class NoTaskLocationException extends RuntimeException
-  {
-public NoTaskLocationException(String message)
-{
-  super(message);
-}
-  }
-
-  public static class TaskNotRunnableException extends RuntimeException
-  {
-public TaskNotRunnableException(String message)
-{
-  super(message);
-}
-  }
-
-  public static final int MAX_RETRY_WAIT_SECONDS = 10;
-
-  private static final int MIN_RETRY_WAIT_SECONDS = 2;
   private static final EmittingLogger log = new 
EmittingLogger(KafkaIndexTaskClient.class);
-  private static final String BASE_PATH = "/druid/worker/v1/chat";
-  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
-  private static final TreeMap EMPTY_TREE_MAP = new TreeMap();
-
-  private final HttpClient httpClient;
-  private final ObjectMapper jsonMapper;
-  private final TaskInfoProvider taskInfoProvider;
-  private final Duration httpTimeout;
-  private final RetryPolicyFactory retryPolicyFactory;
-  private final ListeningExecutorService executorService;
-  private final long numRetries;
+  private static final TreeMap> EMPTY_TREE_MAP = 
new TreeMap<>();
 
 Review comment:
   is there any helper functions to make sure this stays unmodifiable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201766526
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+public NoTaskLocationException(String message)
+{
+  super(message);
+}
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+public TaskNotRunnableException(String message)
+{
+  super(message);
+}
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new 
EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+  HttpClient httpClient,
+  ObjectMapper objectMapper,
+  TaskInfoProvider taskInfoProvider,
+  Duration httpTimeout,
+  String callerId,
+  int numThreads,
+  long numRetries
+  )
+  {
+this.httpClient = httpClient;
+this.objectMapper = objectMapper;
+this.taskInfoProvider = taskInfoProvider;
+this.httpTimeout = httpTimeout;
+this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+this.executorService = MoreExecutors.listeningDecorator(
+Execs.multiThreaded(
+numThreads,
+StringUtils.format(
+"IndexTaskClient-%s-%%d",
+callerId
+)
+)
+);
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long 
numRetries)
+  {
+// Retries [numRetries] times before giving up; this should be set long 
enough to handle any temporary
+// unresponsiveness such as network issues, if 

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r20196
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java
 ##
 @@ -0,0 +1,624 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.FiniteFirehoseFactory;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputSplit;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.indexing.common.task.TaskMonitor.MonitorEntry;
+import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent;
+import io.druid.indexing.common.task.TaskMonitor.TaskHistory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import io.druid.segment.realtime.appenderator.UsedSegmentChecker;
+import io.druid.segment.realtime.firehose.ChatHandler;
+import io.druid.segment.realtime.firehose.ChatHandlerProvider;
+import io.druid.segment.realtime.firehose.ChatHandlers;
+import io.druid.server.security.Action;
+import io.druid.server.security.AuthorizerMapper;
+import io.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * An implementation of {@link ParallelIndexTaskRunner} to support best-effort 
roll-up. This runner can submit and
+ * monitor multiple {@link ParallelIndexSubTask}s.
+ *
+ * As its name indicates, distributed indexing is done in a single phase, 
i.e., without shuffling intermediate data. As
+ * a result, this task can't be used for perfect rollup.
+ */
+public class SinglePhaseParallelIndexTaskRunner implements 
ParallelIndexTaskRunner, ChatHandler
+{
+  private static final Logger log = new 
Logger(SinglePhaseParallelIndexTaskRunner.class);
+
+  private final String taskId;
+  private final String groupId;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final Map context;
+  private final FiniteFirehoseFactory baseFirehoseFactory;
+  private final int maxNumTasks;
+  private final IndexingServiceClient indexingServiceClient;
+  private final ChatHandlerProvider chatHandlerProvider;
+  private final AuthorizerMapper authorizerMapper;
+
+  private final BlockingQueue> 
taskCompleteEvents =
+  new LinkedBlockingDeque<>();
+
+  // subTa

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201760102
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
 ##
 @@ -180,18 +113,17 @@ public boolean resume(final String id)
 
   if (response.getStatus().equals(HttpResponseStatus.OK)) {
 log.info("Task [%s] paused successfully", id);
-return jsonMapper.readValue(response.getContent(), new 
TypeReference>()
+return deserialize(response.getContent(), new 
TypeReference>()
 
 Review comment:
   Is the return object codified anywhere? any way to use an object instead of 
a map?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201761280
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
 ##
 @@ -362,317 +288,70 @@ public boolean setEndOffsets(
 log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, 
endOffsets, finalize);
 
 try {
-  final FullResponseHolder response = submitRequest(
+  final FullResponseHolder response = submitJsonRequest(
   id,
   HttpMethod.POST,
   "offsets/end",
   StringUtils.format("finish=%s", finalize),
-  jsonMapper.writeValueAsBytes(endOffsets),
+  serialize(endOffsets),
   true
   );
-  return response.getStatus().getCode() / 100 == 2;
+  return isSuccess(response);
 }
 catch (NoTaskLocationException e) {
   return false;
 }
 catch (IOException e) {
-  throw new RuntimeException(e);
+  throw Throwables.propagate(e);
 
 Review comment:
   Would it make sense to just allow the method to throw `IOException`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201778402
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java
 ##
 @@ -0,0 +1,624 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.FiniteFirehoseFactory;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputSplit;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.indexing.common.task.TaskMonitor.MonitorEntry;
+import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent;
+import io.druid.indexing.common.task.TaskMonitor.TaskHistory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import io.druid.segment.realtime.appenderator.UsedSegmentChecker;
+import io.druid.segment.realtime.firehose.ChatHandler;
+import io.druid.segment.realtime.firehose.ChatHandlerProvider;
+import io.druid.segment.realtime.firehose.ChatHandlers;
+import io.druid.server.security.Action;
+import io.druid.server.security.AuthorizerMapper;
+import io.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * An implementation of {@link ParallelIndexTaskRunner} to support best-effort 
roll-up. This runner can submit and
+ * monitor multiple {@link ParallelIndexSubTask}s.
+ *
+ * As its name indicates, distributed indexing is done in a single phase, 
i.e., without shuffling intermediate data. As
+ * a result, this task can't be used for perfect rollup.
+ */
+public class SinglePhaseParallelIndexTaskRunner implements 
ParallelIndexTaskRunner, ChatHandler
+{
+  private static final Logger log = new 
Logger(SinglePhaseParallelIndexTaskRunner.class);
+
+  private final String taskId;
+  private final String groupId;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final Map context;
+  private final FiniteFirehoseFactory baseFirehoseFactory;
+  private final int maxNumTasks;
+  private final IndexingServiceClient indexingServiceClient;
+  private final ChatHandlerProvider chatHandlerProvider;
+  private final AuthorizerMapper authorizerMapper;
+
+  private final BlockingQueue> 
taskCompleteEvents =
+  new LinkedBlockingDeque<>();
+
+  // subTa

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201778676
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java
 ##
 @@ -0,0 +1,624 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.FiniteFirehoseFactory;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputSplit;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.indexing.common.task.TaskMonitor.MonitorEntry;
+import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent;
+import io.druid.indexing.common.task.TaskMonitor.TaskHistory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import io.druid.segment.realtime.appenderator.UsedSegmentChecker;
+import io.druid.segment.realtime.firehose.ChatHandler;
+import io.druid.segment.realtime.firehose.ChatHandlerProvider;
+import io.druid.segment.realtime.firehose.ChatHandlers;
+import io.druid.server.security.Action;
+import io.druid.server.security.AuthorizerMapper;
+import io.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * An implementation of {@link ParallelIndexTaskRunner} to support best-effort 
roll-up. This runner can submit and
+ * monitor multiple {@link ParallelIndexSubTask}s.
+ *
+ * As its name indicates, distributed indexing is done in a single phase, 
i.e., without shuffling intermediate data. As
+ * a result, this task can't be used for perfect rollup.
+ */
+public class SinglePhaseParallelIndexTaskRunner implements 
ParallelIndexTaskRunner, ChatHandler
+{
+  private static final Logger log = new 
Logger(SinglePhaseParallelIndexTaskRunner.class);
+
+  private final String taskId;
+  private final String groupId;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final Map context;
+  private final FiniteFirehoseFactory baseFirehoseFactory;
+  private final int maxNumTasks;
+  private final IndexingServiceClient indexingServiceClient;
+  private final ChatHandlerProvider chatHandlerProvider;
+  private final AuthorizerMapper authorizerMapper;
+
+  private final BlockingQueue> 
taskCompleteEvents =
+  new LinkedBlockingDeque<>();
+
+  // subTa

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201765738
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+public NoTaskLocationException(String message)
+{
+  super(message);
+}
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+public TaskNotRunnableException(String message)
+{
+  super(message);
+}
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new 
EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+  HttpClient httpClient,
+  ObjectMapper objectMapper,
+  TaskInfoProvider taskInfoProvider,
+  Duration httpTimeout,
+  String callerId,
+  int numThreads,
+  long numRetries
+  )
+  {
+this.httpClient = httpClient;
+this.objectMapper = objectMapper;
+this.taskInfoProvider = taskInfoProvider;
+this.httpTimeout = httpTimeout;
+this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+this.executorService = MoreExecutors.listeningDecorator(
+Execs.multiThreaded(
+numThreads,
+StringUtils.format(
+"IndexTaskClient-%s-%%d",
+callerId
+)
+)
+);
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long 
numRetries)
+  {
+// Retries [numRetries] times before giving up; this should be set long 
enough to handle any temporary
+// unresponsiveness such as network issues, if 

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201767065
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/actions/CountingSegmentAllocateAction.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import io.druid.indexing.common.Counters;
+import io.druid.indexing.common.task.Task;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * This action is to find a proper {@link 
io.druid.timeline.partition.ShardSpec} based on counting. This keeps track of
+ * the next shard number per {@link Interval} in {@link Counters}. The next 
shard number is incremented by 1 whenever a
+ * new {@link SegmentIdentifier} is allocated.
+ */
+public class CountingSegmentAllocateAction implements 
TaskAction
+{
+  private final String dataSource;
+  private final DateTime timestamp;
+  private final GranularitySpec granularitySpec;
+  @JsonDeserialize(keyUsing = IntervalDeserializer.class)
+  private final Map versions;
+
+  private final SortedSet bucketIntervals;
+
+  @JsonCreator
+  public CountingSegmentAllocateAction(
+  @JsonProperty("dataSource") String dataSource,
+  @JsonProperty("timestamp") DateTime timestamp,
+  @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
+  @JsonProperty("versions") Map versions
+  )
+  {
+this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
+this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp");
+this.granularitySpec = Preconditions.checkNotNull(granularitySpec, 
"granularitySpec");
+this.versions = Preconditions.checkNotNull(versions, "versions");
+
+this.bucketIntervals = 
Preconditions.checkNotNull(granularitySpec.bucketIntervals().orNull(), 
"bucketIntervals");
+  }
+
+  @JsonProperty
+  public String getDataSource()
+  {
+return dataSource;
+  }
+
+  @JsonProperty
+  public DateTime getTimestamp()
+  {
+return timestamp;
+  }
+
+  @JsonProperty
+  public GranularitySpec getGranularitySpec()
+  {
+return granularitySpec;
+  }
+
+  @JsonProperty
+  public Map getVersions()
+  {
+return versions;
+  }
+
+  @Override
+  public TypeReference getReturnTypeReference()
+  {
+return new TypeReference()
 
 Review comment:
   this can be a static final field


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201770598
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java
 ##
 @@ -0,0 +1,451 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputRow;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskLockType;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockListAction;
+import io.druid.indexing.common.actions.LockTryAcquireAction;
+import io.druid.indexing.common.actions.SegmentAllocateAction;
+import io.druid.indexing.common.actions.SurrogateAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
+import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.common.parsers.ParseException;
+import io.druid.query.DruidMetrics;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeIOConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.FireDepartment;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.RealtimeMetricsMonitor;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.SegmentAllocator;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.timeline.DataSegment;
+import org.codehaus.plexus.util.FileUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link 
IndexTask}, but this task
+ * generates and pushes segments, and reports them to the {@link 
ParallelIndexSupervisorTask} instead of
+ * publishing on its own.
+ */
+public class ParallelIndexSubTask extends AbstractTask
+{
+  static final String TYPE = "index_sub";
+
+  private static final Logger log = new Logger(ParallelIndexSubTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+  private final IndexingServiceClient indexingServiceClient;
+  private final IndexTaskClientFactory 
taskClientFactory;
+
+  @JsonCreator
+  public ParallelIndexSubTask(
+  // id shouldn't be null except when this task is created by 
ParallelIndexSupervisorTask
+  @JsonProperty("id") @Nullable final String id,
+  @JsonProperty("gr

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201781962
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java
 ##
 @@ -0,0 +1,502 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.client.indexing.TaskStatusResponse;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Responsible for submitting tasks, monitoring task statuses, resubmitting 
failed tasks, and returning the final task
+ * status.
+ */
+public class TaskMonitor
+{
+  private static final Logger log = new Logger(TaskMonitor.class);
+
+  private final ScheduledExecutorService taskStatusChecker = 
Execs.scheduledSingleThreaded(("task-monitor-%d"));
+
+  /**
+   * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state 
of running {@link SubTaskSpec}s. This is
+   * read in {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and updated in {@link #submit}
+   * and {@link #retry}. This can also be read by calling {@link 
#getRunningTaskMonitorEntory},
+   * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}.
+   */
+  private final ConcurrentMap runningTasks = new 
ConcurrentHashMap<>();
+
+  /**
+   * A map of subTaskSpecId to {@link TaskHistory}. This map stores the 
history of complete {@link SubTaskSpec}s
+   * whether their final state is succeeded or failed. This is updated in 
{@link MonitorEntry#setLastStatus} which is
+   * called by the {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and can be
+   * read by outside of this class.
+   */
+  private final ConcurrentMap> taskHistories = new 
ConcurrentHashMap<>();
+
+  // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks
+  private final Object taskCountLock = new Object();
+
+  // overlord client
+  private final IndexingServiceClient indexingServiceClient;
+  private final int maxRetry;
+  private final int expectedNumSucceededTasks;
+
+  private int numRunningTasks;
+  private int numSucceededTasks;
+  private int numFailedTasks;
+
+  private volatile boolean running = false;
+
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int 
expectedNumSucceededTasks)
+  {
+this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
+this.maxRetry = maxRetry;
+this.expectedNumSucceededTasks = expectedNumSucceededTasks;
+
+log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", 
expectedNumSucceededTasks);
+  }
+
+  public void start(long taskStatusCheckingPeriod)
+  {
+running = true;
+log.info("Starting taskMonitor");
+// NOTE: This polling can be improved to event-driven pushing by 
registering TaskRunnerListener to TaskRunner.
+// That listener should be able to send the events reported to TaskRunner 
to this TaskMonitor.
+taskStatusChecker.scheduleAtFixedRate(
+() -> {
+  try {
+final Iterator> iterator = 
runningTasks.entrySet().iterator();
+while (it

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201763622
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
 
 Review comment:
   can you add more info about which node this runs on? I *think* it runs on 
the middle managers normally? And its making calls back to the overlord?
   
   If that is true what are the considerations for number of http server 
threads the overlord needs compared to settings here for large clusters?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201774505
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/PushedSegmentsReport.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import io.druid.timeline.DataSegment;
+
+import java.util.List;
+
+public class PushedSegmentsReport
 
 Review comment:
   Can you add a brief comment on when this is used?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201767683
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/actions/CountingSegmentAllocateAction.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import io.druid.indexing.common.Counters;
+import io.druid.indexing.common.task.Task;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * This action is to find a proper {@link 
io.druid.timeline.partition.ShardSpec} based on counting. This keeps track of
+ * the next shard number per {@link Interval} in {@link Counters}. The next 
shard number is incremented by 1 whenever a
+ * new {@link SegmentIdentifier} is allocated.
+ */
+public class CountingSegmentAllocateAction implements 
TaskAction
+{
+  private final String dataSource;
+  private final DateTime timestamp;
+  private final GranularitySpec granularitySpec;
+  @JsonDeserialize(keyUsing = IntervalDeserializer.class)
+  private final Map versions;
+
+  private final SortedSet bucketIntervals;
+
+  @JsonCreator
+  public CountingSegmentAllocateAction(
+  @JsonProperty("dataSource") String dataSource,
+  @JsonProperty("timestamp") DateTime timestamp,
+  @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
+  @JsonProperty("versions") Map versions
+  )
+  {
+this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
+this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp");
+this.granularitySpec = Preconditions.checkNotNull(granularitySpec, 
"granularitySpec");
+this.versions = Preconditions.checkNotNull(versions, "versions");
+
+this.bucketIntervals = 
Preconditions.checkNotNull(granularitySpec.bucketIntervals().orNull(), 
"bucketIntervals");
+  }
+
+  @JsonProperty
+  public String getDataSource()
+  {
+return dataSource;
+  }
+
+  @JsonProperty
+  public DateTime getTimestamp()
+  {
+return timestamp;
+  }
+
+  @JsonProperty
+  public GranularitySpec getGranularitySpec()
+  {
+return granularitySpec;
+  }
+
+  @JsonProperty
+  public Map getVersions()
+  {
+return versions;
+  }
+
+  @Override
+  public TypeReference getReturnTypeReference()
+  {
+return new TypeReference()
+{
+};
+  }
+
+  @Override
+  public SegmentIdentifier perform(Task task, TaskActionToolbox toolbox)
+  {
+Optional maybeInterval = 
granularitySpec.bucketInterval(timestamp);
+if (!maybeInterval.isPresent()) {
+  throw new ISE("Could not find interval for timestamp [%s]", timestamp);
+}
+
+final Interval interval = maybeInterval.get();
+if (!bucketIntervals.contains(interval)) {
+  throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", 
interval, granularitySpec);
+}
+
+final Counters counters = toolbox.getCounters();
 
 Review comment:
   There's no way to clear out counters. How does this work for long-running 
toolbox instances? Like if an overlord runs for weeks or months at a time, it 
will just keep accumulating interval maps?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201761921
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskModule.java
 ##
 @@ -50,5 +53,8 @@
   @Override
   public void configure(Binder binder)
   {
+binder.bind(new 
TypeLiteral>(){})
 
 Review comment:
   will this get new-line'ed on a code reformat?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201767881
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/actions/CountingSegmentAllocateAction.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import io.druid.indexing.common.Counters;
+import io.druid.indexing.common.task.Task;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * This action is to find a proper {@link 
io.druid.timeline.partition.ShardSpec} based on counting. This keeps track of
+ * the next shard number per {@link Interval} in {@link Counters}. The next 
shard number is incremented by 1 whenever a
+ * new {@link SegmentIdentifier} is allocated.
+ */
+public class CountingSegmentAllocateAction implements 
TaskAction
+{
+  private final String dataSource;
+  private final DateTime timestamp;
+  private final GranularitySpec granularitySpec;
+  @JsonDeserialize(keyUsing = IntervalDeserializer.class)
+  private final Map versions;
+
+  private final SortedSet bucketIntervals;
+
+  @JsonCreator
+  public CountingSegmentAllocateAction(
+  @JsonProperty("dataSource") String dataSource,
+  @JsonProperty("timestamp") DateTime timestamp,
+  @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
+  @JsonProperty("versions") Map versions
+  )
+  {
+this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
+this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp");
+this.granularitySpec = Preconditions.checkNotNull(granularitySpec, 
"granularitySpec");
+this.versions = Preconditions.checkNotNull(versions, "versions");
+
+this.bucketIntervals = 
Preconditions.checkNotNull(granularitySpec.bucketIntervals().orNull(), 
"bucketIntervals");
+  }
+
+  @JsonProperty
+  public String getDataSource()
+  {
+return dataSource;
+  }
+
+  @JsonProperty
+  public DateTime getTimestamp()
+  {
+return timestamp;
+  }
+
+  @JsonProperty
+  public GranularitySpec getGranularitySpec()
+  {
+return granularitySpec;
+  }
+
+  @JsonProperty
+  public Map getVersions()
+  {
+return versions;
+  }
+
+  @Override
+  public TypeReference getReturnTypeReference()
+  {
+return new TypeReference()
+{
+};
+  }
+
+  @Override
+  public SegmentIdentifier perform(Task task, TaskActionToolbox toolbox)
+  {
+Optional maybeInterval = 
granularitySpec.bucketInterval(timestamp);
+if (!maybeInterval.isPresent()) {
+  throw new ISE("Could not find interval for timestamp [%s]", timestamp);
+}
+
+final Interval interval = maybeInterval.get();
+if (!bucketIntervals.contains(interval)) {
+  throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", 
interval, granularitySpec);
+}
+
+final Counters counters = toolbox.getCounters();
 
 Review comment:
   That seems like it will cause Heap problems. Is there any sort of cleanup 
mechanism?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201781345
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java
 ##
 @@ -0,0 +1,502 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.client.indexing.TaskStatusResponse;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Responsible for submitting tasks, monitoring task statuses, resubmitting 
failed tasks, and returning the final task
+ * status.
+ */
+public class TaskMonitor
+{
+  private static final Logger log = new Logger(TaskMonitor.class);
+
+  private final ScheduledExecutorService taskStatusChecker = 
Execs.scheduledSingleThreaded(("task-monitor-%d"));
+
+  /**
+   * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state 
of running {@link SubTaskSpec}s. This is
+   * read in {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and updated in {@link #submit}
+   * and {@link #retry}. This can also be read by calling {@link 
#getRunningTaskMonitorEntory},
+   * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}.
+   */
+  private final ConcurrentMap runningTasks = new 
ConcurrentHashMap<>();
+
+  /**
+   * A map of subTaskSpecId to {@link TaskHistory}. This map stores the 
history of complete {@link SubTaskSpec}s
+   * whether their final state is succeeded or failed. This is updated in 
{@link MonitorEntry#setLastStatus} which is
+   * called by the {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and can be
+   * read by outside of this class.
+   */
+  private final ConcurrentMap> taskHistories = new 
ConcurrentHashMap<>();
+
+  // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks
+  private final Object taskCountLock = new Object();
+
+  // overlord client
+  private final IndexingServiceClient indexingServiceClient;
+  private final int maxRetry;
+  private final int expectedNumSucceededTasks;
+
+  private int numRunningTasks;
+  private int numSucceededTasks;
+  private int numFailedTasks;
+
+  private volatile boolean running = false;
+
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int 
expectedNumSucceededTasks)
+  {
+this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
+this.maxRetry = maxRetry;
+this.expectedNumSucceededTasks = expectedNumSucceededTasks;
+
+log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", 
expectedNumSucceededTasks);
+  }
+
+  public void start(long taskStatusCheckingPeriod)
+  {
+running = true;
+log.info("Starting taskMonitor");
+// NOTE: This polling can be improved to event-driven pushing by 
registering TaskRunnerListener to TaskRunner.
+// That listener should be able to send the events reported to TaskRunner 
to this TaskMonitor.
+taskStatusChecker.scheduleAtFixedRate(
+() -> {
+  try {
+final Iterator> iterator = 
runningTasks.entrySet().iterator();
+while (it

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201782258
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java
 ##
 @@ -465,6 +467,23 @@ public RemoteTaskRunnerConfig getConfig()
 return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), 
runningTasks.values(), completeTasks.values()));
   }
 
+  @Nullable
+  @Override
+  public RunnerTaskState getRunnerTaskState(String taskId)
 
 Review comment:
   can this return `Optional< RunnerTaskState>` instead of being `Nullable`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201780850
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java
 ##
 @@ -0,0 +1,502 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.client.indexing.TaskStatusResponse;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Responsible for submitting tasks, monitoring task statuses, resubmitting 
failed tasks, and returning the final task
+ * status.
+ */
+public class TaskMonitor
+{
+  private static final Logger log = new Logger(TaskMonitor.class);
+
+  private final ScheduledExecutorService taskStatusChecker = 
Execs.scheduledSingleThreaded(("task-monitor-%d"));
+
+  /**
+   * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state 
of running {@link SubTaskSpec}s. This is
+   * read in {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and updated in {@link #submit}
+   * and {@link #retry}. This can also be read by calling {@link 
#getRunningTaskMonitorEntory},
+   * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}.
+   */
+  private final ConcurrentMap runningTasks = new 
ConcurrentHashMap<>();
+
+  /**
+   * A map of subTaskSpecId to {@link TaskHistory}. This map stores the 
history of complete {@link SubTaskSpec}s
+   * whether their final state is succeeded or failed. This is updated in 
{@link MonitorEntry#setLastStatus} which is
+   * called by the {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and can be
+   * read by outside of this class.
+   */
+  private final ConcurrentMap> taskHistories = new 
ConcurrentHashMap<>();
+
+  // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks
+  private final Object taskCountLock = new Object();
+
+  // overlord client
+  private final IndexingServiceClient indexingServiceClient;
+  private final int maxRetry;
+  private final int expectedNumSucceededTasks;
+
+  private int numRunningTasks;
+  private int numSucceededTasks;
+  private int numFailedTasks;
+
+  private volatile boolean running = false;
+
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int 
expectedNumSucceededTasks)
+  {
+this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
+this.maxRetry = maxRetry;
+this.expectedNumSucceededTasks = expectedNumSucceededTasks;
+
+log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", 
expectedNumSucceededTasks);
+  }
+
+  public void start(long taskStatusCheckingPeriod)
+  {
+running = true;
+log.info("Starting taskMonitor");
+// NOTE: This polling can be improved to event-driven pushing by 
registering TaskRunnerListener to TaskRunner.
+// That listener should be able to send the events reported to TaskRunner 
to this TaskMonitor.
+taskStatusChecker.scheduleAtFixedRate(
+() -> {
+  try {
+final Iterator> iterator = 
runningTasks.entrySet().iterator();
+while (it

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201772060
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java
 ##
 @@ -0,0 +1,451 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputRow;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskLockType;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockListAction;
+import io.druid.indexing.common.actions.LockTryAcquireAction;
+import io.druid.indexing.common.actions.SegmentAllocateAction;
+import io.druid.indexing.common.actions.SurrogateAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
+import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.common.parsers.ParseException;
+import io.druid.query.DruidMetrics;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeIOConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.FireDepartment;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.RealtimeMetricsMonitor;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.SegmentAllocator;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.timeline.DataSegment;
+import org.codehaus.plexus.util.FileUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link 
IndexTask}, but this task
+ * generates and pushes segments, and reports them to the {@link 
ParallelIndexSupervisorTask} instead of
+ * publishing on its own.
+ */
+public class ParallelIndexSubTask extends AbstractTask
+{
+  static final String TYPE = "index_sub";
+
+  private static final Logger log = new Logger(ParallelIndexSubTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+  private final IndexingServiceClient indexingServiceClient;
+  private final IndexTaskClientFactory 
taskClientFactory;
+
+  @JsonCreator
+  public ParallelIndexSubTask(
+  // id shouldn't be null except when this task is created by 
ParallelIndexSupervisorTask
+  @JsonProperty("id") @Nullable final String id,
+  @JsonProperty("gr

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201781409
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java
 ##
 @@ -0,0 +1,502 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.client.indexing.TaskStatusResponse;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Responsible for submitting tasks, monitoring task statuses, resubmitting 
failed tasks, and returning the final task
+ * status.
+ */
+public class TaskMonitor
+{
+  private static final Logger log = new Logger(TaskMonitor.class);
+
+  private final ScheduledExecutorService taskStatusChecker = 
Execs.scheduledSingleThreaded(("task-monitor-%d"));
+
+  /**
+   * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state 
of running {@link SubTaskSpec}s. This is
+   * read in {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and updated in {@link #submit}
+   * and {@link #retry}. This can also be read by calling {@link 
#getRunningTaskMonitorEntory},
+   * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}.
+   */
+  private final ConcurrentMap runningTasks = new 
ConcurrentHashMap<>();
+
+  /**
+   * A map of subTaskSpecId to {@link TaskHistory}. This map stores the 
history of complete {@link SubTaskSpec}s
+   * whether their final state is succeeded or failed. This is updated in 
{@link MonitorEntry#setLastStatus} which is
+   * called by the {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and can be
+   * read by outside of this class.
+   */
+  private final ConcurrentMap> taskHistories = new 
ConcurrentHashMap<>();
+
+  // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks
+  private final Object taskCountLock = new Object();
+
+  // overlord client
+  private final IndexingServiceClient indexingServiceClient;
+  private final int maxRetry;
+  private final int expectedNumSucceededTasks;
+
+  private int numRunningTasks;
+  private int numSucceededTasks;
+  private int numFailedTasks;
+
+  private volatile boolean running = false;
+
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int 
expectedNumSucceededTasks)
+  {
+this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
+this.maxRetry = maxRetry;
+this.expectedNumSucceededTasks = expectedNumSucceededTasks;
+
+log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", 
expectedNumSucceededTasks);
+  }
+
+  public void start(long taskStatusCheckingPeriod)
+  {
+running = true;
+log.info("Starting taskMonitor");
+// NOTE: This polling can be improved to event-driven pushing by 
registering TaskRunnerListener to TaskRunner.
+// That listener should be able to send the events reported to TaskRunner 
to this TaskMonitor.
+taskStatusChecker.scheduleAtFixedRate(
+() -> {
+  try {
+final Iterator> iterator = 
runningTasks.entrySet().iterator();
+while (it

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201774618
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/PushedSegmentsReport.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import io.druid.timeline.DataSegment;
+
+import java.util.List;
+
+public class PushedSegmentsReport
 
 Review comment:
   Like, is it used in ALL indexing tasks, or just the native-batch one?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201764385
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
 
 Review comment:
   Or is this going from the overlord to the indexing tasks?
   
   What if there are a few hundred middle managers, does that cause any 
problems here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201779095
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java
 ##
 @@ -0,0 +1,502 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.client.indexing.TaskStatusResponse;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Responsible for submitting tasks, monitoring task statuses, resubmitting 
failed tasks, and returning the final task
+ * status.
+ */
+public class TaskMonitor
+{
+  private static final Logger log = new Logger(TaskMonitor.class);
+
+  private final ScheduledExecutorService taskStatusChecker = 
Execs.scheduledSingleThreaded(("task-monitor-%d"));
+
+  /**
+   * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state 
of running {@link SubTaskSpec}s. This is
+   * read in {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and updated in {@link #submit}
+   * and {@link #retry}. This can also be read by calling {@link 
#getRunningTaskMonitorEntory},
+   * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}.
+   */
+  private final ConcurrentMap runningTasks = new 
ConcurrentHashMap<>();
+
+  /**
+   * A map of subTaskSpecId to {@link TaskHistory}. This map stores the 
history of complete {@link SubTaskSpec}s
+   * whether their final state is succeeded or failed. This is updated in 
{@link MonitorEntry#setLastStatus} which is
+   * called by the {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and can be
+   * read by outside of this class.
+   */
+  private final ConcurrentMap> taskHistories = new 
ConcurrentHashMap<>();
+
+  // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks
+  private final Object taskCountLock = new Object();
 
 Review comment:
   would this make more sense as a ReadWriteLock?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201771117
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java
 ##
 @@ -0,0 +1,451 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputRow;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskLockType;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockListAction;
+import io.druid.indexing.common.actions.LockTryAcquireAction;
+import io.druid.indexing.common.actions.SegmentAllocateAction;
+import io.druid.indexing.common.actions.SurrogateAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
+import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.common.parsers.ParseException;
+import io.druid.query.DruidMetrics;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeIOConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.FireDepartment;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.RealtimeMetricsMonitor;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.SegmentAllocator;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.timeline.DataSegment;
+import org.codehaus.plexus.util.FileUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link 
IndexTask}, but this task
+ * generates and pushes segments, and reports them to the {@link 
ParallelIndexSupervisorTask} instead of
+ * publishing on its own.
+ */
+public class ParallelIndexSubTask extends AbstractTask
+{
+  static final String TYPE = "index_sub";
+
+  private static final Logger log = new Logger(ParallelIndexSubTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+  private final IndexingServiceClient indexingServiceClient;
+  private final IndexTaskClientFactory 
taskClientFactory;
+
+  @JsonCreator
+  public ParallelIndexSubTask(
+  // id shouldn't be null except when this task is created by 
ParallelIndexSupervisorTask
+  @JsonProperty("id") @Nullable final String id,
+  @JsonProperty("gr

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201769468
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java
 ##
 @@ -0,0 +1,451 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
 
 Review comment:
   is it possible to use the java one?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201774095
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexTuningConfig.java
 ##
 @@ -0,0 +1,195 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.druid.indexing.common.task.IndexTask.IndexTuningConfig;
+import io.druid.segment.IndexSpec;
+import io.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+@JsonTypeName("index_parallel")
+public class ParallelIndexTuningConfig extends IndexTuningConfig
+{
+  private static final int DEFAULT_MAX_NUM_BATCH_TASKS = Integer.MAX_VALUE; // 
unlimited
+  private static final int DEFAULT_MAX_RETRY = 3;
+  private static final long DEFAULT_TASK_STATUS_CHECK_PERIOD_MS = 1000;
+
+  private static final Duration DEFAULT_CHAT_HANDLER_TIMEOUT = new 
Period("PT10S").toStandardDuration();
+  private static final int DEFAULT_CHAT_HANDLER_NUM_RETRIES = 5;
+
+  private final int maxNumSubTasks;
+  private final int maxRetry;
+  private final long taskStatusCheckPeriodMs;
+
+  private final Duration chatHandlerTimeout;
+  private final int chatHandlerNumRetries;
+
+  public static ParallelIndexTuningConfig defaultConfig()
+  {
+return new ParallelIndexTuningConfig(
+null,
 
 Review comment:
   lol


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201770798
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java
 ##
 @@ -0,0 +1,451 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputRow;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskLockType;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockListAction;
+import io.druid.indexing.common.actions.LockTryAcquireAction;
+import io.druid.indexing.common.actions.SegmentAllocateAction;
+import io.druid.indexing.common.actions.SurrogateAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
+import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.common.parsers.ParseException;
+import io.druid.query.DruidMetrics;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeIOConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.FireDepartment;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.RealtimeMetricsMonitor;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.SegmentAllocator;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.timeline.DataSegment;
+import org.codehaus.plexus.util.FileUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link 
IndexTask}, but this task
+ * generates and pushes segments, and reports them to the {@link 
ParallelIndexSupervisorTask} instead of
+ * publishing on its own.
+ */
+public class ParallelIndexSubTask extends AbstractTask
+{
+  static final String TYPE = "index_sub";
+
+  private static final Logger log = new Logger(ParallelIndexSubTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+  private final IndexingServiceClient indexingServiceClient;
+  private final IndexTaskClientFactory 
taskClientFactory;
+
+  @JsonCreator
+  public ParallelIndexSubTask(
+  // id shouldn't be null except when this task is created by 
ParallelIndexSupervisorTask
+  @JsonProperty("id") @Nullable final String id,
+  @JsonProperty("gr

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201769404
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java
 ##
 @@ -0,0 +1,451 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputRow;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskLockType;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockListAction;
+import io.druid.indexing.common.actions.LockTryAcquireAction;
+import io.druid.indexing.common.actions.SegmentAllocateAction;
+import io.druid.indexing.common.actions.SurrogateAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
+import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.common.parsers.ParseException;
+import io.druid.query.DruidMetrics;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeIOConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.FireDepartment;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.RealtimeMetricsMonitor;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.SegmentAllocator;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.timeline.DataSegment;
+import org.codehaus.plexus.util.FileUtils;
 
 Review comment:
   is this the right import?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201771165
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java
 ##
 @@ -0,0 +1,451 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputRow;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskLockType;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockListAction;
+import io.druid.indexing.common.actions.LockTryAcquireAction;
+import io.druid.indexing.common.actions.SegmentAllocateAction;
+import io.druid.indexing.common.actions.SurrogateAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
+import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.common.parsers.ParseException;
+import io.druid.query.DruidMetrics;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeIOConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.FireDepartment;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.RealtimeMetricsMonitor;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.SegmentAllocator;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.timeline.DataSegment;
+import org.codehaus.plexus.util.FileUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link 
IndexTask}, but this task
+ * generates and pushes segments, and reports them to the {@link 
ParallelIndexSupervisorTask} instead of
+ * publishing on its own.
+ */
+public class ParallelIndexSubTask extends AbstractTask
+{
+  static final String TYPE = "index_sub";
+
+  private static final Logger log = new Logger(ParallelIndexSubTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+  private final IndexingServiceClient indexingServiceClient;
+  private final IndexTaskClientFactory 
taskClientFactory;
+
+  @JsonCreator
+  public ParallelIndexSubTask(
+  // id shouldn't be null except when this task is created by 
ParallelIndexSupervisorTask
+  @JsonProperty("id") @Nullable final String id,
+  @JsonProperty("gr

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201761583
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
 ##
 @@ -362,317 +288,70 @@ public boolean setEndOffsets(
 log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, 
endOffsets, finalize);
 
 try {
-  final FullResponseHolder response = submitRequest(
+  final FullResponseHolder response = submitJsonRequest(
   id,
   HttpMethod.POST,
   "offsets/end",
   StringUtils.format("finish=%s", finalize),
-  jsonMapper.writeValueAsBytes(endOffsets),
+  serialize(endOffsets),
   true
   );
-  return response.getStatus().getCode() / 100 == 2;
+  return isSuccess(response);
 }
 catch (NoTaskLocationException e) {
   return false;
 }
 catch (IOException e) {
-  throw new RuntimeException(e);
+  throw Throwables.propagate(e);
 }
   }
 
   public ListenableFuture stopAsync(final String id, final boolean 
publish)
   {
-return executorService.submit(
-new Callable()
-{
-  @Override
-  public Boolean call()
-  {
-return stop(id, publish);
-  }
-}
-);
+return doAsync(() -> stop(id, publish));
   }
 
   public ListenableFuture resumeAsync(final String id)
   {
-return executorService.submit(
-new Callable()
-{
-  @Override
-  public Boolean call()
-  {
-return resume(id);
-  }
-}
-);
+return doAsync(() -> resume(id));
 
 Review comment:
   (out of scope for this PR) since java has native completable futures now, do 
these need to be listenable futures?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-10 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201513918
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+public NoTaskLocationException(String message)
+{
+  super(message);
+}
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+public TaskNotRunnableException(String message)
+{
+  super(message);
+}
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new 
EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+  HttpClient httpClient,
+  ObjectMapper objectMapper,
+  TaskInfoProvider taskInfoProvider,
+  Duration httpTimeout,
+  String callerId,
+  int numThreads,
+  long numRetries
+  )
+  {
+this.httpClient = httpClient;
+this.objectMapper = objectMapper;
+this.taskInfoProvider = taskInfoProvider;
+this.httpTimeout = httpTimeout;
+this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+this.executorService = MoreExecutors.listeningDecorator(
+Execs.multiThreaded(
+numThreads,
+StringUtils.format(
+"IndexTaskClient-%s-%%d",
+callerId
+)
+)
+);
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long 
numRetries)
+  {
+// Retries [numRetries] times before giving up; this should be set long 
enough to handle any temporary
+// unresponsiveness such as network issues, if 

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-09 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r198963530
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+public NoTaskLocationException(String message)
+{
+  super(message);
+}
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+public TaskNotRunnableException(String message)
+{
+  super(message);
+}
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new 
EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+  HttpClient httpClient,
+  ObjectMapper objectMapper,
+  TaskInfoProvider taskInfoProvider,
+  Duration httpTimeout,
+  String callerId,
+  int numThreads,
+  long numRetries
+  )
+  {
+this.httpClient = httpClient;
+this.objectMapper = objectMapper;
+this.taskInfoProvider = taskInfoProvider;
+this.httpTimeout = httpTimeout;
+this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+this.executorService = MoreExecutors.listeningDecorator(
+Execs.multiThreaded(
+numThreads,
+StringUtils.format(
+"IndexTaskClient-%s-%%d",
+callerId
+)
+)
+);
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long 
numRetries)
+  {
+// Retries [numRetries] times before giving up; this should be set long 
enough to handle any temporary
+// unresponsiveness such as network issues, if 

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-09 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r198963358
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+public NoTaskLocationException(String message)
+{
+  super(message);
+}
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+public TaskNotRunnableException(String message)
+{
+  super(message);
+}
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new 
EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+  HttpClient httpClient,
+  ObjectMapper objectMapper,
+  TaskInfoProvider taskInfoProvider,
+  Duration httpTimeout,
+  String callerId,
+  int numThreads,
+  long numRetries
+  )
+  {
+this.httpClient = httpClient;
+this.objectMapper = objectMapper;
+this.taskInfoProvider = taskInfoProvider;
+this.httpTimeout = httpTimeout;
+this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+this.executorService = MoreExecutors.listeningDecorator(
+Execs.multiThreaded(
+numThreads,
+StringUtils.format(
+"IndexTaskClient-%s-%%d",
+callerId
+)
+)
+);
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long 
numRetries)
+  {
+// Retries [numRetries] times before giving up; this should be set long 
enough to handle any temporary
+// unresponsiveness such as network issues, if 

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-09 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r198962953
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+public NoTaskLocationException(String message)
+{
+  super(message);
+}
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+public TaskNotRunnableException(String message)
+{
+  super(message);
+}
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new 
EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+  HttpClient httpClient,
+  ObjectMapper objectMapper,
+  TaskInfoProvider taskInfoProvider,
+  Duration httpTimeout,
+  String callerId,
+  int numThreads,
+  long numRetries
+  )
+  {
+this.httpClient = httpClient;
+this.objectMapper = objectMapper;
+this.taskInfoProvider = taskInfoProvider;
+this.httpTimeout = httpTimeout;
+this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+this.executorService = MoreExecutors.listeningDecorator(
+Execs.multiThreaded(
+numThreads,
+StringUtils.format(
+"IndexTaskClient-%s-%%d",
+callerId
+)
+)
+);
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long 
numRetries)
+  {
+// Retries [numRetries] times before giving up; this should be set long 
enough to handle any temporary
+// unresponsiveness such as network issues, if 

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-09 Thread GitBox
drcrallen commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r198962691
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+public NoTaskLocationException(String message)
+{
+  super(message);
+}
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+public TaskNotRunnableException(String message)
+{
+  super(message);
+}
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new 
EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+  HttpClient httpClient,
+  ObjectMapper objectMapper,
+  TaskInfoProvider taskInfoProvider,
+  Duration httpTimeout,
+  String callerId,
+  int numThreads,
+  long numRetries
+  )
+  {
+this.httpClient = httpClient;
+this.objectMapper = objectMapper;
+this.taskInfoProvider = taskInfoProvider;
+this.httpTimeout = httpTimeout;
+this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+this.executorService = MoreExecutors.listeningDecorator(
+Execs.multiThreaded(
+numThreads,
+StringUtils.format(
+"IndexTaskClient-%s-%%d",
+callerId
+)
+)
+);
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long 
numRetries)
+  {
+// Retries [numRetries] times before giving up; this should be set long 
enough to handle any temporary
+// unresponsiveness such as network issues, if