[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201892300 ## File path: indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java ## @@ -465,6 +467,23 @@ public RemoteTaskRunnerConfig getConfig() return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values())); } + @Nullable + @Override + public RunnerTaskState getRunnerTaskState(String taskId) Review comment: What is the benefit of using Optional instead of returning null here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201892018 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java ## @@ -0,0 +1,502 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.client.indexing.TaskStatusResponse; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.common.logger.Logger; + +import javax.annotation.Nullable; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Responsible for submitting tasks, monitoring task statuses, resubmitting failed tasks, and returning the final task + * status. + */ +public class TaskMonitor +{ + private static final Logger log = new Logger(TaskMonitor.class); + + private final ScheduledExecutorService taskStatusChecker = Execs.scheduledSingleThreaded(("task-monitor-%d")); + + /** + * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state of running {@link SubTaskSpec}s. This is + * read in {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and updated in {@link #submit} + * and {@link #retry}. This can also be read by calling {@link #getRunningTaskMonitorEntory}, + * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}. + */ + private final ConcurrentMap runningTasks = new ConcurrentHashMap<>(); + + /** + * A map of subTaskSpecId to {@link TaskHistory}. This map stores the history of complete {@link SubTaskSpec}s + * whether their final state is succeeded or failed. This is updated in {@link MonitorEntry#setLastStatus} which is + * called by the {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and can be + * read by outside of this class. + */ + private final ConcurrentMap> taskHistories = new ConcurrentHashMap<>(); + + // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks + private final Object taskCountLock = new Object(); + + // overlord client + private final IndexingServiceClient indexingServiceClient; + private final int maxRetry; + private final int expectedNumSucceededTasks; + + private int numRunningTasks; + private int numSucceededTasks; + private int numFailedTasks; + + private volatile boolean running = false; + + TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int expectedNumSucceededTasks) + { +this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient"); +this.maxRetry = maxRetry; +this.expectedNumSucceededTasks = expectedNumSucceededTasks; + +log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", expectedNumSucceededTasks); + } + + public void start(long taskStatusCheckingPeriod) + { +running = true; +log.info("Starting taskMonitor"); +// NOTE: This polling can be improved to event-driven pushing by registering TaskRunnerListener to TaskRunner. +// That listener should be able to send the events reported to TaskRunner to this TaskMonitor. +taskStatusChecker.scheduleAtFixedRate( +() -> { + try { +final Iterator> iterator = runningTasks.entrySet().iterator(); Review comment:
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891941 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java ## @@ -0,0 +1,624 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputSplit; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.indexing.common.task.TaskMonitor.MonitorEntry; +import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent; +import io.druid.indexing.common.task.TaskMonitor.TaskHistory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.segment.realtime.appenderator.UsedSegmentChecker; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandlers; +import io.druid.server.security.Action; +import io.druid.server.security.AuthorizerMapper; +import io.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An implementation of {@link ParallelIndexTaskRunner} to support best-effort roll-up. This runner can submit and + * monitor multiple {@link ParallelIndexSubTask}s. + * + * As its name indicates, distributed indexing is done in a single phase, i.e., without shuffling intermediate data. As + * a result, this task can't be used for perfect rollup. + */ +public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunner, ChatHandler +{ + private static final Logger log = new Logger(SinglePhaseParallelIndexTaskRunner.class); + + private final String taskId; + private final String groupId; + private final ParallelIndexIngestionSpec ingestionSchema; + private final Map context; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final int maxNumTasks; + private final IndexingServiceClient indexingServiceClient; + private final ChatHandlerProvider chatHandlerProvider; + private final AuthorizerMapper authorizerMapper; + + private final BlockingQueue> taskCompleteEvents = + new LinkedBlockingDeque<>(); + + //
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201892027 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java ## @@ -0,0 +1,502 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.client.indexing.TaskStatusResponse; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.common.logger.Logger; + +import javax.annotation.Nullable; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Responsible for submitting tasks, monitoring task statuses, resubmitting failed tasks, and returning the final task + * status. + */ +public class TaskMonitor +{ + private static final Logger log = new Logger(TaskMonitor.class); + + private final ScheduledExecutorService taskStatusChecker = Execs.scheduledSingleThreaded(("task-monitor-%d")); + + /** + * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state of running {@link SubTaskSpec}s. This is + * read in {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and updated in {@link #submit} + * and {@link #retry}. This can also be read by calling {@link #getRunningTaskMonitorEntory}, + * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}. + */ + private final ConcurrentMap runningTasks = new ConcurrentHashMap<>(); + + /** + * A map of subTaskSpecId to {@link TaskHistory}. This map stores the history of complete {@link SubTaskSpec}s + * whether their final state is succeeded or failed. This is updated in {@link MonitorEntry#setLastStatus} which is + * called by the {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and can be + * read by outside of this class. + */ + private final ConcurrentMap> taskHistories = new ConcurrentHashMap<>(); + + // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks + private final Object taskCountLock = new Object(); + + // overlord client + private final IndexingServiceClient indexingServiceClient; + private final int maxRetry; + private final int expectedNumSucceededTasks; + + private int numRunningTasks; + private int numSucceededTasks; + private int numFailedTasks; + + private volatile boolean running = false; + + TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int expectedNumSucceededTasks) + { +this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient"); +this.maxRetry = maxRetry; +this.expectedNumSucceededTasks = expectedNumSucceededTasks; + +log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", expectedNumSucceededTasks); + } + + public void start(long taskStatusCheckingPeriod) + { +running = true; +log.info("Starting taskMonitor"); +// NOTE: This polling can be improved to event-driven pushing by registering TaskRunnerListener to TaskRunner. +// That listener should be able to send the events reported to TaskRunner to this TaskMonitor. +taskStatusChecker.scheduleAtFixedRate( +() -> { + try { +final Iterator> iterator = runningTasks.entrySet().iterator(); +while
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891972 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java ## @@ -0,0 +1,624 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputSplit; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.indexing.common.task.TaskMonitor.MonitorEntry; +import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent; +import io.druid.indexing.common.task.TaskMonitor.TaskHistory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.segment.realtime.appenderator.UsedSegmentChecker; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandlers; +import io.druid.server.security.Action; +import io.druid.server.security.AuthorizerMapper; +import io.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An implementation of {@link ParallelIndexTaskRunner} to support best-effort roll-up. This runner can submit and + * monitor multiple {@link ParallelIndexSubTask}s. + * + * As its name indicates, distributed indexing is done in a single phase, i.e., without shuffling intermediate data. As + * a result, this task can't be used for perfect rollup. + */ +public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunner, ChatHandler +{ + private static final Logger log = new Logger(SinglePhaseParallelIndexTaskRunner.class); + + private final String taskId; + private final String groupId; + private final ParallelIndexIngestionSpec ingestionSchema; + private final Map context; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final int maxNumTasks; + private final IndexingServiceClient indexingServiceClient; + private final ChatHandlerProvider chatHandlerProvider; + private final AuthorizerMapper authorizerMapper; + + private final BlockingQueue> taskCompleteEvents = + new LinkedBlockingDeque<>(); + + //
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891892 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexTaskRunner.java ## @@ -0,0 +1,32 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.task; + +import io.druid.indexer.TaskState; +import io.druid.indexing.common.TaskToolbox; + +/** + * ParallelIndexTaskRunner is the actual task runner of {@link ParallelIndexSupervisorTask}. There is currently a single + * implementation, i.e. {@link SinglePhaseParallelIndexTaskRunner} which supports only best-effort roll-up. We can add + * more implementations in the future. + */ +public interface ParallelIndexTaskRunner +{ + TaskState run(TaskToolbox toolbox) throws Exception; Review comment: So, `ParallelIndexTaskRunner` is the one actually processing the task. It can have multiple implementations (maybe two) based on the distributed indexing algorithm described in https://github.com/apache/incubator-druid/issues/5543. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891935 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java ## @@ -0,0 +1,624 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputSplit; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.indexing.common.task.TaskMonitor.MonitorEntry; +import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent; +import io.druid.indexing.common.task.TaskMonitor.TaskHistory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.segment.realtime.appenderator.UsedSegmentChecker; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandlers; +import io.druid.server.security.Action; +import io.druid.server.security.AuthorizerMapper; +import io.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An implementation of {@link ParallelIndexTaskRunner} to support best-effort roll-up. This runner can submit and + * monitor multiple {@link ParallelIndexSubTask}s. + * + * As its name indicates, distributed indexing is done in a single phase, i.e., without shuffling intermediate data. As + * a result, this task can't be used for perfect rollup. + */ +public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunner, ChatHandler +{ + private static final Logger log = new Logger(SinglePhaseParallelIndexTaskRunner.class); + + private final String taskId; + private final String groupId; + private final ParallelIndexIngestionSpec ingestionSchema; + private final Map context; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final int maxNumTasks; + private final IndexingServiceClient indexingServiceClient; + private final ChatHandlerProvider chatHandlerProvider; + private final AuthorizerMapper authorizerMapper; + + private final BlockingQueue> taskCompleteEvents = + new LinkedBlockingDeque<>(); + + //
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891918 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/PushedSegmentsReport.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.timeline.DataSegment; + +import java.util.List; + +public class PushedSegmentsReport Review comment: Just the native parallel batch one. Added javadoc. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891941 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java ## @@ -0,0 +1,624 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputSplit; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.indexing.common.task.TaskMonitor.MonitorEntry; +import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent; +import io.druid.indexing.common.task.TaskMonitor.TaskHistory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.segment.realtime.appenderator.UsedSegmentChecker; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandlers; +import io.druid.server.security.Action; +import io.druid.server.security.AuthorizerMapper; +import io.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An implementation of {@link ParallelIndexTaskRunner} to support best-effort roll-up. This runner can submit and + * monitor multiple {@link ParallelIndexSubTask}s. + * + * As its name indicates, distributed indexing is done in a single phase, i.e., without shuffling intermediate data. As + * a result, this task can't be used for perfect rollup. + */ +public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunner, ChatHandler +{ + private static final Logger log = new Logger(SinglePhaseParallelIndexTaskRunner.class); + + private final String taskId; + private final String groupId; + private final ParallelIndexIngestionSpec ingestionSchema; + private final Map context; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final int maxNumTasks; + private final IndexingServiceClient indexingServiceClient; + private final ChatHandlerProvider chatHandlerProvider; + private final AuthorizerMapper authorizerMapper; + + private final BlockingQueue> taskCompleteEvents = + new LinkedBlockingDeque<>(); + + //
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891798 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java ## @@ -0,0 +1,451 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; Review comment: That would require a lot of changes in `GranularitySpec` and its all call sites. It should be done in a separate PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891847 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java ## @@ -0,0 +1,451 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskLockType; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentAllocateAction; +import io.druid.indexing.common.actions.SurrogateAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.IndexTask.IndexIOConfig; +import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.query.DruidMetrics; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.RealtimeMetricsMonitor; +import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; +import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; +import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.timeline.DataSegment; +import org.codehaus.plexus.util.FileUtils; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link IndexTask}, but this task + * generates and pushes segments, and reports them to the {@link ParallelIndexSupervisorTask} instead of + * publishing on its own. + */ +public class ParallelIndexSubTask extends AbstractTask +{ + static final String TYPE = "index_sub"; + + private static final Logger log = new Logger(ParallelIndexSubTask.class); + + private final int numAttempts; + private final ParallelIndexIngestionSpec ingestionSchema; + private final String supervisorTaskId; + private final IndexingServiceClient indexingServiceClient; + private final IndexTaskClientFactory taskClientFactory; + + @JsonCreator + public ParallelIndexSubTask( + // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask + @JsonProperty("id") @Nullable final String id, +
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891822 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java ## @@ -0,0 +1,451 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskLockType; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentAllocateAction; +import io.druid.indexing.common.actions.SurrogateAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.IndexTask.IndexIOConfig; +import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.query.DruidMetrics; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.RealtimeMetricsMonitor; +import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; +import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; +import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.timeline.DataSegment; +import org.codehaus.plexus.util.FileUtils; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link IndexTask}, but this task + * generates and pushes segments, and reports them to the {@link ParallelIndexSupervisorTask} instead of + * publishing on its own. + */ +public class ParallelIndexSubTask extends AbstractTask +{ + static final String TYPE = "index_sub"; + + private static final Logger log = new Logger(ParallelIndexSubTask.class); + + private final int numAttempts; + private final ParallelIndexIngestionSpec ingestionSchema; + private final String supervisorTaskId; + private final IndexingServiceClient indexingServiceClient; + private final IndexTaskClientFactory taskClientFactory; + + @JsonCreator + public ParallelIndexSubTask( + // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask + @JsonProperty("id") @Nullable final String id, +
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891862 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexTaskClient.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.indexing.common.IndexTaskClient; +import io.druid.indexing.common.TaskInfoProvider; +import io.druid.java.util.common.ISE; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.response.FullResponseHolder; +import io.druid.timeline.DataSegment; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.Duration; + +import java.io.IOException; +import java.util.List; + +public class ParallelIndexTaskClient extends IndexTaskClient +{ + private final String subtaskId; + + public ParallelIndexTaskClient( Review comment: Changed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891892 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexTaskRunner.java ## @@ -0,0 +1,32 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.task; + +import io.druid.indexer.TaskState; +import io.druid.indexing.common.TaskToolbox; + +/** + * ParallelIndexTaskRunner is the actual task runner of {@link ParallelIndexSupervisorTask}. There is currently a single + * implementation, i.e. {@link SinglePhaseParallelIndexTaskRunner} which supports only best-effort roll-up. We can add + * more implementations in the future. + */ +public interface ParallelIndexTaskRunner +{ + TaskState run(TaskToolbox toolbox) throws Exception; Review comment: So, `ParallelIndexTaskRunner` is the one actually processing the task. It can have multiple implementations based on the distributed indexing algorithm described in https://github.com/apache/incubator-druid/issues/5543. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891785 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java ## @@ -0,0 +1,451 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskLockType; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentAllocateAction; +import io.druid.indexing.common.actions.SurrogateAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.IndexTask.IndexIOConfig; +import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.query.DruidMetrics; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.RealtimeMetricsMonitor; +import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; +import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; +import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.timeline.DataSegment; +import org.codehaus.plexus.util.FileUtils; Review comment: Good catch! Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891803 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java ## @@ -0,0 +1,451 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskLockType; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentAllocateAction; +import io.druid.indexing.common.actions.SurrogateAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.IndexTask.IndexIOConfig; +import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.query.DruidMetrics; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.RealtimeMetricsMonitor; +import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; +import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; +import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.timeline.DataSegment; +import org.codehaus.plexus.util.FileUtils; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link IndexTask}, but this task + * generates and pushes segments, and reports them to the {@link ParallelIndexSupervisorTask} instead of + * publishing on its own. + */ +public class ParallelIndexSubTask extends AbstractTask +{ + static final String TYPE = "index_sub"; + + private static final Logger log = new Logger(ParallelIndexSubTask.class); + + private final int numAttempts; + private final ParallelIndexIngestionSpec ingestionSchema; + private final String supervisorTaskId; + private final IndexingServiceClient indexingServiceClient; + private final IndexTaskClientFactory taskClientFactory; + + @JsonCreator + public ParallelIndexSubTask( + // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask + @JsonProperty("id") @Nullable final String id, +
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891856 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSupervisorTask.java ## @@ -0,0 +1,322 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; +import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; +import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.server.security.AuthorizerMapper; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; + +/** + * ParallelIndexSupervisorTask is capable of running multiple subTasks for parallel indexing. This is + * applicable if the input {@link FiniteFirehoseFactory} is splittable. While this task is running, it can submit + * multiple child tasks to overlords. This task succeeds only when all its child tasks succeed; otherwise it fails. + * + * @see ParallelIndexTaskRunner + */ +public class ParallelIndexSupervisorTask extends AbstractTask implements ChatHandler +{ + static final String TYPE = "index_parallel"; + + private static final Logger log = new Logger(ParallelIndexSupervisorTask.class); + + private final ParallelIndexIngestionSpec ingestionSchema; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final IndexingServiceClient indexingServiceClient; + private final ChatHandlerProvider chatHandlerProvider; + private final AuthorizerMapper authorizerMapper; + private final RowIngestionMetersFactory rowIngestionMetersFactory; + + private ParallelIndexTaskRunner runner; + + @JsonCreator + public ParallelIndexSupervisorTask( + @JsonProperty("id") String id, + @JsonProperty("resource") TaskResource taskResource, + @JsonProperty("spec") ParallelIndexIngestionSpec ingestionSchema, + @JsonProperty("context") Map context, + @JacksonInject @Nullable IndexingServiceClient indexingServiceClient, // null in overlords + @JacksonInject @Nullable ChatHandlerProvider chatHandlerProvider, // null in overlords + @JacksonInject AuthorizerMapper authorizerMapper, + @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory + ) + { +super( +getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()), +null, +taskResource, +ingestionSchema.getDataSchema().getDataSource(), +context +); + +this.ingestionSchema = ingestionSchema; + +final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); +if (!(firehoseFactory instanceof FiniteFirehoseFactory)) { + throw new IAE("[%s] should implement FiniteFirehoseFactory", firehoseFactory.getClass().getSimpleName()); +} + +this.baseFirehoseFactory = (FiniteFirehoseFactory) firehoseFactory; +this.indexingServiceClient = indexingServiceClient; +this.chatHandlerProvider = chatHandlerProvider; +this.authorizerMapper = authorizerMapper; +this.rowIngestionMetersFactory = rowIngestionMetersFactory; + +if (ingestionSchema.getTuningConfig().getMaxSavedParseExceptions() > 0) { +
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891827 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java ## @@ -0,0 +1,451 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskLockType; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentAllocateAction; +import io.druid.indexing.common.actions.SurrogateAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.IndexTask.IndexIOConfig; +import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.query.DruidMetrics; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.RealtimeMetricsMonitor; +import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; +import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; +import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.timeline.DataSegment; +import org.codehaus.plexus.util.FileUtils; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link IndexTask}, but this task + * generates and pushes segments, and reports them to the {@link ParallelIndexSupervisorTask} instead of + * publishing on its own. + */ +public class ParallelIndexSubTask extends AbstractTask +{ + static final String TYPE = "index_sub"; + + private static final Logger log = new Logger(ParallelIndexSubTask.class); + + private final int numAttempts; + private final ParallelIndexIngestionSpec ingestionSchema; + private final String supervisorTaskId; + private final IndexingServiceClient indexingServiceClient; + private final IndexTaskClientFactory taskClientFactory; + + @JsonCreator + public ParallelIndexSubTask( + // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask + @JsonProperty("id") @Nullable final String id, +
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891749 ## File path: indexing-service/src/main/java/io/druid/indexing/common/actions/CountingSegmentAllocateAction.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.KeyDeserializer; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import io.druid.indexing.common.Counters; +import io.druid.indexing.common.task.Task; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.SortedSet; + +/** + * This action is to find a proper {@link io.druid.timeline.partition.ShardSpec} based on counting. This keeps track of + * the next shard number per {@link Interval} in {@link Counters}. The next shard number is incremented by 1 whenever a + * new {@link SegmentIdentifier} is allocated. + */ +public class CountingSegmentAllocateAction implements TaskAction +{ + private final String dataSource; + private final DateTime timestamp; + private final GranularitySpec granularitySpec; + @JsonDeserialize(keyUsing = IntervalDeserializer.class) + private final Map versions; + + private final SortedSet bucketIntervals; + + @JsonCreator + public CountingSegmentAllocateAction( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("timestamp") DateTime timestamp, + @JsonProperty("granularitySpec") GranularitySpec granularitySpec, + @JsonProperty("versions") Map versions + ) + { +this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); +this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp"); +this.granularitySpec = Preconditions.checkNotNull(granularitySpec, "granularitySpec"); +this.versions = Preconditions.checkNotNull(versions, "versions"); + +this.bucketIntervals = Preconditions.checkNotNull(granularitySpec.bucketIntervals().orNull(), "bucketIntervals"); + } + + @JsonProperty + public String getDataSource() + { +return dataSource; + } + + @JsonProperty + public DateTime getTimestamp() + { +return timestamp; + } + + @JsonProperty + public GranularitySpec getGranularitySpec() + { +return granularitySpec; + } + + @JsonProperty + public Map getVersions() + { +return versions; + } + + @Override + public TypeReference getReturnTypeReference() + { +return new TypeReference() +{ +}; + } + + @Override + public SegmentIdentifier perform(Task task, TaskActionToolbox toolbox) + { +Optional maybeInterval = granularitySpec.bucketInterval(timestamp); +if (!maybeInterval.isPresent()) { + throw new ISE("Could not find interval for timestamp [%s]", timestamp); +} + +final Interval interval = maybeInterval.get(); +if (!bucketIntervals.contains(interval)) { + throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec); +} + +final Counters counters = toolbox.getCounters(); Review comment: Oh, thanks for finding this! `Counters` is expected to be created per task, not a singleton. Will fix this soon. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891665 ## File path: indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java ## @@ -0,0 +1,381 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.indexer.TaskLocation; +import io.druid.indexer.TaskStatus; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.IOE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.FullResponseHandler; +import io.druid.java.util.http.client.response.FullResponseHolder; +import io.druid.segment.realtime.firehose.ChatHandlerResource; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Callable; + +/** + * Abstract class to communicate with index tasks via HTTP. This class provides interfaces to serialize/deserialize + * data and send an HTTP request. + */ +public abstract class IndexTaskClient implements AutoCloseable +{ + public static class NoTaskLocationException extends RuntimeException + { +public NoTaskLocationException(String message) +{ + super(message); +} + } + + public static class TaskNotRunnableException extends RuntimeException + { +public TaskNotRunnableException(String message) +{ + super(message); +} + } + + public static final int MAX_RETRY_WAIT_SECONDS = 10; + + private static final EmittingLogger log = new EmittingLogger(IndexTaskClient.class); + private static final String BASE_PATH = "/druid/worker/v1/chat"; + private static final int MIN_RETRY_WAIT_SECONDS = 2; + private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5; + + private final HttpClient httpClient; + private final ObjectMapper objectMapper; + private final TaskInfoProvider taskInfoProvider; + private final Duration httpTimeout; + private final RetryPolicyFactory retryPolicyFactory; + private final ListeningExecutorService executorService; + + public IndexTaskClient( + HttpClient httpClient, + ObjectMapper objectMapper, + TaskInfoProvider taskInfoProvider, + Duration httpTimeout, + String callerId, + int numThreads, + long numRetries + ) + { +this.httpClient = httpClient; +this.objectMapper = objectMapper; +this.taskInfoProvider = taskInfoProvider; +this.httpTimeout = httpTimeout; +this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries); +this.executorService = MoreExecutors.listeningDecorator( +Execs.multiThreaded( +numThreads, +StringUtils.format( +"IndexTaskClient-%s-%%d", +callerId +) +) +); + } + + private static RetryPolicyFactory initializeRetryPolicyFactory(long numRetries) + { +// Retries [numRetries] times before giving up; this should be set long enough to handle any temporary +// unresponsiveness such as network issues, if
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891629 ## File path: indexing-service/src/main/java/io/druid/indexing/common/Counters.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.google.common.util.concurrent.AtomicDouble; Review comment: Yes, only the google common one is available. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891657 ## File path: indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java ## @@ -0,0 +1,381 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.indexer.TaskLocation; +import io.druid.indexer.TaskStatus; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.IOE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.FullResponseHandler; +import io.druid.java.util.http.client.response.FullResponseHolder; +import io.druid.segment.realtime.firehose.ChatHandlerResource; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Callable; + +/** + * Abstract class to communicate with index tasks via HTTP. This class provides interfaces to serialize/deserialize + * data and send an HTTP request. + */ +public abstract class IndexTaskClient implements AutoCloseable +{ + public static class NoTaskLocationException extends RuntimeException + { +public NoTaskLocationException(String message) +{ + super(message); +} + } + + public static class TaskNotRunnableException extends RuntimeException + { +public TaskNotRunnableException(String message) +{ + super(message); +} + } + + public static final int MAX_RETRY_WAIT_SECONDS = 10; + + private static final EmittingLogger log = new EmittingLogger(IndexTaskClient.class); + private static final String BASE_PATH = "/druid/worker/v1/chat"; + private static final int MIN_RETRY_WAIT_SECONDS = 2; + private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5; + + private final HttpClient httpClient; + private final ObjectMapper objectMapper; + private final TaskInfoProvider taskInfoProvider; + private final Duration httpTimeout; + private final RetryPolicyFactory retryPolicyFactory; + private final ListeningExecutorService executorService; + + public IndexTaskClient( + HttpClient httpClient, + ObjectMapper objectMapper, + TaskInfoProvider taskInfoProvider, + Duration httpTimeout, + String callerId, + int numThreads, + long numRetries + ) + { +this.httpClient = httpClient; +this.objectMapper = objectMapper; +this.taskInfoProvider = taskInfoProvider; +this.httpTimeout = httpTimeout; +this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries); +this.executorService = MoreExecutors.listeningDecorator( +Execs.multiThreaded( +numThreads, +StringUtils.format( +"IndexTaskClient-%s-%%d", +callerId +) +) +); + } + + private static RetryPolicyFactory initializeRetryPolicyFactory(long numRetries) + { +// Retries [numRetries] times before giving up; this should be set long enough to handle any temporary +// unresponsiveness such as network issues, if
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891707 ## File path: indexing-service/src/main/java/io/druid/indexing/common/actions/CountingSegmentAllocateAction.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.KeyDeserializer; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import io.druid.indexing.common.Counters; +import io.druid.indexing.common.task.Task; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.SortedSet; + +/** + * This action is to find a proper {@link io.druid.timeline.partition.ShardSpec} based on counting. This keeps track of + * the next shard number per {@link Interval} in {@link Counters}. The next shard number is incremented by 1 whenever a + * new {@link SegmentIdentifier} is allocated. + */ +public class CountingSegmentAllocateAction implements TaskAction +{ + private final String dataSource; + private final DateTime timestamp; + private final GranularitySpec granularitySpec; + @JsonDeserialize(keyUsing = IntervalDeserializer.class) + private final Map versions; + + private final SortedSet bucketIntervals; + + @JsonCreator + public CountingSegmentAllocateAction( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("timestamp") DateTime timestamp, + @JsonProperty("granularitySpec") GranularitySpec granularitySpec, + @JsonProperty("versions") Map versions + ) + { +this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); +this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp"); +this.granularitySpec = Preconditions.checkNotNull(granularitySpec, "granularitySpec"); +this.versions = Preconditions.checkNotNull(versions, "versions"); + +this.bucketIntervals = Preconditions.checkNotNull(granularitySpec.bucketIntervals().orNull(), "bucketIntervals"); + } + + @JsonProperty + public String getDataSource() + { +return dataSource; + } + + @JsonProperty + public DateTime getTimestamp() + { +return timestamp; + } + + @JsonProperty + public GranularitySpec getGranularitySpec() + { +return granularitySpec; + } + + @JsonProperty + public Map getVersions() + { +return versions; + } + + @Override + public TypeReference getReturnTypeReference() + { +return new TypeReference() Review comment: Hmm, what is the benefit of making this to a static final variable here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891619 ## File path: extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskModule.java ## @@ -50,5 +53,8 @@ @Override public void configure(Binder binder) { +binder.bind(new TypeLiteral>(){}) Review comment: Good catch. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891640 ## File path: indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java ## @@ -0,0 +1,381 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.indexer.TaskLocation; +import io.druid.indexer.TaskStatus; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.IOE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.FullResponseHandler; +import io.druid.java.util.http.client.response.FullResponseHolder; +import io.druid.segment.realtime.firehose.ChatHandlerResource; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Callable; + +/** + * Abstract class to communicate with index tasks via HTTP. This class provides interfaces to serialize/deserialize Review comment: `IndexTaskClient` is theoretically can be used by any nodes, but currently is being used by both overlords and middleManagers (more precisely peons). In Kafka indexing service, the supervisor (which is run on an overlord) is using this to communicate with kafkaIndexTasks. In native batch indexing, the supervisorTask (which is run on a peon) is using this to communicate with subTasks. I think there should be no significant issue here because each subTasks call a REST api of the supervisorTask only one time before they are finished. > If that is true what are the considerations for number of http server threads the overlord needs compared to settings here for large clusters? Maybe it's problem. I'm not sure about this. @gianm any thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891627 ## File path: indexing-service/src/main/java/io/druid/indexing/appenderator/SegmentAllocateActionGenerator.java ## @@ -0,0 +1,35 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.appenderator; + +import io.druid.data.input.InputRow; +import io.druid.indexing.common.actions.TaskAction; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; + +public interface SegmentAllocateActionGenerator +{ + TaskAction generate( Review comment: Added. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891613 ## File path: extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java ## @@ -362,317 +288,70 @@ public boolean setEndOffsets( log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, endOffsets, finalize); try { - final FullResponseHolder response = submitRequest( + final FullResponseHolder response = submitJsonRequest( id, HttpMethod.POST, "offsets/end", StringUtils.format("finish=%s", finalize), - jsonMapper.writeValueAsBytes(endOffsets), + serialize(endOffsets), true ); - return response.getStatus().getCode() / 100 == 2; + return isSuccess(response); } catch (NoTaskLocationException e) { return false; } catch (IOException e) { - throw new RuntimeException(e); + throw Throwables.propagate(e); } } public ListenableFuture stopAsync(final String id, final boolean publish) { -return executorService.submit( -new Callable() -{ - @Override - public Boolean call() - { -return stop(id, publish); - } -} -); +return doAsync(() -> stop(id, publish)); } public ListenableFuture resumeAsync(final String id) { -return executorService.submit( -new Callable() -{ - @Override - public Boolean call() - { -return resume(id); - } -} -); +return doAsync(() -> resume(id)); Review comment: Yes, we can use CompletableFuture instead. Probably related to https://github.com/apache/incubator-druid/issues/5415. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891602 ## File path: extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java ## @@ -362,317 +288,70 @@ public boolean setEndOffsets( log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, endOffsets, finalize); try { - final FullResponseHolder response = submitRequest( + final FullResponseHolder response = submitJsonRequest( id, HttpMethod.POST, "offsets/end", StringUtils.format("finish=%s", finalize), - jsonMapper.writeValueAsBytes(endOffsets), + serialize(endOffsets), true ); - return response.getStatus().getCode() / 100 == 2; + return isSuccess(response); } catch (NoTaskLocationException e) { return false; } catch (IOException e) { - throw new RuntimeException(e); + throw Throwables.propagate(e); Review comment: Good point. Changed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] asdf2014 commented on a change in pull request #5996: Fix NPE while handling CheckpointNotice in KafkaSupervisor
asdf2014 commented on a change in pull request #5996: Fix NPE while handling CheckpointNotice in KafkaSupervisor URL: https://github.com/apache/incubator-druid/pull/5996#discussion_r201891545 ## File path: extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java ## @@ -240,11 +239,7 @@ public void reset(DataSourceMetadata dataSourceMetadata) } @Override - public void checkpoint( - @Nullable String sequenceName, - @Nullable DataSourceMetadata previousCheckPoint, - @Nullable DataSourceMetadata currentCheckPoint - ) + public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint) Review comment: I should see the latest version code when i visit [https://github.com/apache/incubator-druid/pull/5996/files](https://github.com/apache/incubator-druid/pull/5996/files) this page, right? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891581 ## File path: extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java ## @@ -180,18 +113,17 @@ public boolean resume(final String id) if (response.getStatus().equals(HttpResponseStatus.OK)) { log.info("Task [%s] paused successfully", id); -return jsonMapper.readValue(response.getContent(), new TypeReference>() +return deserialize(response.getContent(), new TypeReference>() Review comment: What do you mean? The returned object is a map and used by `KafkaSupervisor`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] asdf2014 commented on a change in pull request #5996: Fix NPE while handling CheckpointNotice in KafkaSupervisor
asdf2014 commented on a change in pull request #5996: Fix NPE while handling CheckpointNotice in KafkaSupervisor URL: https://github.com/apache/incubator-druid/pull/5996#discussion_r201890785 ## File path: extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java ## @@ -240,11 +239,7 @@ public void reset(DataSourceMetadata dataSourceMetadata) } @Override - public void checkpoint( - @Nullable String sequenceName, - @Nullable DataSourceMetadata previousCheckPoint, - @Nullable DataSourceMetadata currentCheckPoint - ) + public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint) Review comment: Um.. I am sure. It still exists in the latest version code. :sweat_smile: ```java public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint) ``` should be ```java public void checkpoint( int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint ) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5996: Fix NPE while handling CheckpointNotice in KafkaSupervisor
jihoonson commented on a change in pull request #5996: Fix NPE while handling CheckpointNotice in KafkaSupervisor URL: https://github.com/apache/incubator-druid/pull/5996#discussion_r201887446 ## File path: extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java ## @@ -240,11 +239,7 @@ public void reset(DataSourceMetadata dataSourceMetadata) } @Override - public void checkpoint( - @Nullable String sequenceName, - @Nullable DataSourceMetadata previousCheckPoint, - @Nullable DataSourceMetadata currentCheckPoint - ) + public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint) Review comment: @asdf2014 thanks for the review! Maybe you saw an old commit. I've resolved all code style issues. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] asdf2014 commented on issue #5980: Various changes about a few coding specifications
asdf2014 commented on issue #5980: Various changes about a few coding specifications URL: https://github.com/apache/incubator-druid/pull/5980#issuecomment-404354126 @leventov You are welcome. The amount of code is large and very troublesome. May have to trouble you. :+1: This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on issue #5980: Various changes about a few coding specifications
jihoonson commented on issue #5980: Various changes about a few coding specifications URL: https://github.com/apache/incubator-druid/pull/5980#issuecomment-404353255 @asdf2014 thanks. I'll take another look. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson edited a comment on issue #5492: Native parallel batch indexing without shuffle
jihoonson edited a comment on issue #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#issuecomment-403671944 > sorry for the slow review, was moving last week No worries about the review speed. It's always better late than never. Hope you moved to some nice place. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[incubator-druid] branch master updated: show that flatten will also work with avro extension (#5874)
This is an automated email from the ASF dual-hosted git repository. fjy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git The following commit(s) were added to refs/heads/master by this push: new 5f78a33 show that flatten will also work with avro extension (#5874) 5f78a33 is described below commit 5f78a333ad0455ad377b1eac0e119fa3594a9d10 Author: Caroline1000 AuthorDate: Wed Jul 11 16:47:03 2018 -0700 show that flatten will also work with avro extension (#5874) * show that flatten will also work with avro extension * fix url --- docs/content/ingestion/flatten-json.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/ingestion/flatten-json.md b/docs/content/ingestion/flatten-json.md index 8c76974..eddfa96 100644 --- a/docs/content/ingestion/flatten-json.md +++ b/docs/content/ingestion/flatten-json.md @@ -9,7 +9,7 @@ layout: doc_page | useFieldDiscovery | Boolean | If true, interpret all fields with singular values (not a map or list) and flat lists (lists of singular values) at the root level as columns. | no (default == true) | | fields | JSON Object array | Specifies the fields of interest and how they are accessed | no (default == []) | -Defining the JSON Flatten Spec allows nested JSON fields to be flattened during ingestion time. Only the JSON ParseSpec supports flattening. +Defining the JSON Flatten Spec allows nested JSON fields to be flattened during ingestion time. Only parseSpecs of types "json" or ["avro"](../development/extensions-core/avro.html) support flattening. 'fields' is a list of JSON Objects, describing the field names and how the fields are accessed: - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] fjy closed pull request #5874: show that flatten will also work with avro extension
fjy closed pull request #5874: show that flatten will also work with avro extension URL: https://github.com/apache/incubator-druid/pull/5874 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/content/ingestion/flatten-json.md b/docs/content/ingestion/flatten-json.md index 8c76974cc6a..eddfa96acb2 100644 --- a/docs/content/ingestion/flatten-json.md +++ b/docs/content/ingestion/flatten-json.md @@ -9,7 +9,7 @@ layout: doc_page | useFieldDiscovery | Boolean | If true, interpret all fields with singular values (not a map or list) and flat lists (lists of singular values) at the root level as columns. | no (default == true) | | fields | JSON Object array | Specifies the fields of interest and how they are accessed | no (default == []) | -Defining the JSON Flatten Spec allows nested JSON fields to be flattened during ingestion time. Only the JSON ParseSpec supports flattening. +Defining the JSON Flatten Spec allows nested JSON fields to be flattened during ingestion time. Only parseSpecs of types "json" or ["avro"](../development/extensions-core/avro.html) support flattening. 'fields' is a list of JSON Objects, describing the field names and how the fields are accessed: This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jon-wei closed pull request #5988: Coordinator fix exception caused by additional logging
jon-wei closed pull request #5988: Coordinator fix exception caused by additional logging URL: https://github.com/apache/incubator-druid/pull/5988 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/server/src/main/java/io/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/CuratorLoadQueuePeon.java index d794b82cea5..aa831e7f043 100644 --- a/server/src/main/java/io/druid/server/coordinator/CuratorLoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/CuratorLoadQueuePeon.java @@ -386,13 +386,13 @@ private void entryRemoved(String path) ); return; } - actionCompleted(); log.info( "Server[%s] done processing %s of segment [%s]", basePath, currentlyProcessing.getType() == LOAD ? "load" : "drop", path ); + actionCompleted(); } } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index 9dcee89bece..063063a92bd 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -247,7 +247,6 @@ private void assignReplicas( createLoadQueueSizeLimitingPredicate(params), segment ); - log.info("Assigned %d replicas in tier [%s]", numAssigned, tier); stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned); } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson closed pull request #5995: fix link to query-context in broker config doc
jihoonson closed pull request #5995: fix link to query-context in broker config doc URL: https://github.com/apache/incubator-druid/pull/5995 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index edddca824db..36fc0378d2b 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -45,7 +45,7 @@ Druid uses Jetty to serve HTTP requests. |`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip| |`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M| |`druid.broker.http.unusedConnectionTimeout`|The timeout for idle connections in connection pool. This timeout should be less than `druid.broker.http.readTimeout`. Set this timeout = ~90% of `druid.broker.http.readTimeout`|`PT4M`| -|`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](query-context.html) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE| +|`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](../querying/query-context.html) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE| |`druid.server.http.maxRequestHeaderSize`|Maximum size of a request header in bytes. Larger headers consume more memory and can make a server more vulnerable to denial of service attacks. |8 * 1024| This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
Re: License headers and NOTICE file
Do your best with the NOTICE file. We will scrutinize it during the release process. I know it seems impolite not to mention contributors and copied code in the NOTICE, but there is a good reason to keep its contents absolutely minimal. Downstream projects are required to reproduce the NOTICE file, therefore everything we put in it places a burden on those projects. Lastly note that a binary artifacts often include much more than a source release. If so, their NOTICE files may need to include extra things. To keep things simple, we strongly recommend that the first Apache release is source only. Julian > On Jul 11, 2018, at 1:00 PM, Gian Merlino wrote: > > I am looking at sorting out our license headers and NOTICE file. > > For license headers the only real question I have at this point is > https://github.com/apache/incubator-druid/issues/5835#issuecomment-404257393. > If anyone with experience in this sort of thing could chime in then that > would be very useful. > > For NOTICE, it looks like > http://www.apache.org/dev/licensing-howto.html#overview-of-files governs > what the NOTICE file should contain. Here is our current one: > https://github.com/apache/incubator-druid/blob/master/NOTICE. It has the > following features, > > 1) Copyright notices up top for three major contributors. > 2) A notice for each project where we've copied code directly into the > Druid codebase. They're all Apache licensed. > > From reading the licensing-howto it seems like the file needs some tweaks. > A bolded principle in the guide is "Do not add anything to NOTICE which is > not legally required." The copyright notices for major contributors don't > seem necessary given they are all covered by SGA / CLAs. The howto also > says "If the dependency supplies a NOTICE file, its contents must be > analyzed and the relevant portions bubbled up into the top-level NOTICE > file." We haven't been doing that -- we have just been listing the projects > themselves. > > For projects where we've copied code, and for which those projects don't > have a NOTICE, should we remove them completely from our NOTICE? And I > suppose we should double check that we have source files marked > appropriately in cases where they're copied from another project. > > Is there anyone with more experience in writing NOTICE files that could > chime in with thoughts please? - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] Caroline1000 opened a new pull request #5995: fix link to query-context in broker config doc
Caroline1000 opened a new pull request #5995: fix link to query-context in broker config doc URL: https://github.com/apache/incubator-druid/pull/5995 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
Re: Druid 0.12.2-rc1 vote
Well, it's never good if a WTH?! message actually gets logged. They are usually meant to be things that should "never" happen. I am ok with holding off 0.12.2-rc1 until this fix is in. On Wed, Jul 11, 2018 at 1:04 PM Jihoon Son wrote: > Thanks everyone for voting. > > Unfortunately, I found another bug in Kafka indexing service ( > https://github.com/apache/incubator-druid/issues/5992). I think it's worth > to include 0.12.2. > I'm currently working on that issue and can probably finish at least by > this week. > > Can we add it to 0.12.2 and vote again once a patch to fix is merged? > > Jihoon > > On Wed, Jul 11, 2018 at 10:02 AM Jonathan Wei wrote: > > > +1 > > > > On Wed, Jul 11, 2018 at 9:44 AM, Gian Merlino wrote: > > > > > +1 from me too! > > > > > > On Wed, Jul 11, 2018 at 7:28 AM Charles Allen > > wrote: > > > > > > > That is very helpful, thank you! > > > > > > > > +1 for continuing with 0.12.2-RC1 > > > > > > > > On Tue, Jul 10, 2018 at 6:51 PM Clint Wylie > > > wrote: > > > > > > > > > Heya, sorry for the delay (and missing the sync, i'll try to get > > better > > > > > about showing up). I've fixed a handful of coordinator bugs post > > 0.12.0 > > > > > (and > > > > > not backported to 0.12.1), some of these issues go far back, some > > back > > > to > > > > > when segment assignment priority for different tiers of historicals > > was > > > > > introduced, some are just some oddities on the behavior of the > > balancer > > > > > that I am unsure when were introduced. This is the complete list of > > > fixes > > > > > that are currently in 0.12.2 afaik, with a small description (see > PRs > > > and > > > > > associated issues for more details) > > > > > > > > > > https://github.com/apache/incubator-druid/pull/5528 fixed an issue > > > that > > > > > movement did not drop the segment from the server the segment was > > being > > > > > moved from (this one goes wy back, to batch segment > > announcements) > > > > > > > > > > https://github.com/apache/incubator-druid/pull/5529 changed > behavior > > > of > > > > > drop to use the balancer to choose where to drop segments from, > based > > > on > > > > > behavior observed caused by the issue of 5528 > > > > > > > > > > https://github.com/apache/incubator-druid/pull/5532 fixes an issue > > > where > > > > > primary assignment during load rule processing would assign an > > > > unavailable > > > > > segment to every server with capacity until at least 1 historical > had > > > the > > > > > segment (and drop it from all the others if they all loaded at the > > same > > > > > time), choking load queues from doing useful things > > > > > > > > > > https://github.com/apache/incubator-druid/pull/ fixed a way > for > > > http > > > > > based coordinator to get stuck loading or dropping segments and a > > > > companion > > > > > PR that fixed a lambda that wasn't friendly to older jvm versions > > > > > https://github.com/apache/incubator-druid/pull/5591 > > > > > > > > > > https://github.com/apache/incubator-druid/pull/5888 makes > balancing > > > > honor > > > > > a > > > > > load rule max load queue depth setting to help prevent movement > from > > > > > starving loading > > > > > > > > > > https://github.com/apache/incubator-druid/pull/5928 doesn't really > > fix > > > > > anything, just does an early return to avoid doing pointless work > > > > > > > > > > Additionally, there are a couple of pairs of PRs that are not > > currently > > > > in > > > > > 0.12.2: https://github.com/druid-io/druid/pull/5927 and > > > > > https://github.com/apache/incubator-druid/pull/5929 and their > > > respective > > > > > fixes which have yet to be merged, but have been performing well on > > our > > > > > test cluster, https://github.com/apache/incubator-druid/pull/5987 > > and > > > > > https://github.com/apache/incubator-druid/pull/5988. One of them > > makes > > > > > balancing behave in a way more consistent with expectations by > always > > > > > trying to move maxSegmentsToMove and more correctly tracking what > the > > > > > balancer is doing, and one just adds better logging (without much > > extra > > > > log > > > > > volume) due to frustrations I had chasing down all these other > > issues. > > > > Both > > > > > of these were slated for 0.12.2 but were pulled out because of the > > > issues > > > > > (which the open PRs fix afaict). I would be in favor of sliding > them > > in > > > > > there, pending review of the fixes, but understand if they won't > make > > > the > > > > > cut since they maybe fall a bit more on the cosmetic side of > things. > > > I'm > > > > > pretty happy of the state of things on our test cluster right now, > > but > > > > > without these 4 patches things should still be operating more > > correctly > > > > > than they were before, just the differences being with balancing > > moving > > > > > somewhere between 0 and max, and less useful logging making future > > > issues > > > > > (which I have no doubts still lurk)
[GitHub] jihoonson commented on issue #5994: Automatic cleanup druid_tasks table of metastore
jihoonson commented on issue #5994: Automatic cleanup druid_tasks table of metastore URL: https://github.com/apache/incubator-druid/issues/5994#issuecomment-404289731 Related to https://github.com/apache/incubator-druid/issues/5979#issuecomment-404050959. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on issue #5979: Kafka Indexing Service lagging every hour
jihoonson commented on issue #5979: Kafka Indexing Service lagging every hour URL: https://github.com/apache/incubator-druid/issues/5979#issuecomment-404289501 @erankor thanks! I raised https://github.com/apache/incubator-druid/issues/5994. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson opened a new issue #5994: Automatic cleanup druid_tasks table of metastore
jihoonson opened a new issue #5994: Automatic cleanup druid_tasks table of metastore URL: https://github.com/apache/incubator-druid/issues/5994 `druid_tasks` table stores all submitted tasks to overlords. Since this is never cleaned up automatically, it always grows as more tasks are submitted until people tidy up it manually. So, it makes sense to periodically delete old entries from that table as the coordinator does for old segments. I think it also makes sense to delete old entries if `druid.indexer.logs.kill.enabled` option is set because deleting task logs usually mean you don't want to check that task anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] erankor commented on issue #5979: Kafka Indexing Service lagging every hour
erankor commented on issue #5979: Kafka Indexing Service lagging every hour URL: https://github.com/apache/incubator-druid/issues/5979#issuecomment-404285477 Thanks @jihoonson, regarding 2, as you wrote - I just went over all tables, and looked deeper into the ones with a non-negligible number of rows - I don't have any issue with this table. However, I will still add some cronjob to delete records from druid_tasks & tasklogs, as we're using a very week MySQL instance, and I rather keep all tables as small as possible. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201777132 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java ## @@ -0,0 +1,624 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputSplit; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.indexing.common.task.TaskMonitor.MonitorEntry; +import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent; +import io.druid.indexing.common.task.TaskMonitor.TaskHistory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.segment.realtime.appenderator.UsedSegmentChecker; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandlers; +import io.druid.server.security.Action; +import io.druid.server.security.AuthorizerMapper; +import io.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An implementation of {@link ParallelIndexTaskRunner} to support best-effort roll-up. This runner can submit and + * monitor multiple {@link ParallelIndexSubTask}s. + * + * As its name indicates, distributed indexing is done in a single phase, i.e., without shuffling intermediate data. As + * a result, this task can't be used for perfect rollup. + */ +public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunner, ChatHandler +{ + private static final Logger log = new Logger(SinglePhaseParallelIndexTaskRunner.class); + + private final String taskId; + private final String groupId; + private final ParallelIndexIngestionSpec ingestionSchema; + private final Map context; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final int maxNumTasks; + private final IndexingServiceClient indexingServiceClient; + private final ChatHandlerProvider chatHandlerProvider; + private final AuthorizerMapper authorizerMapper; + + private final BlockingQueue> taskCompleteEvents = + new LinkedBlockingDeque<>(); + + //
[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201773312 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexTaskClient.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.indexing.common.IndexTaskClient; +import io.druid.indexing.common.TaskInfoProvider; +import io.druid.java.util.common.ISE; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.response.FullResponseHolder; +import io.druid.timeline.DataSegment; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.Duration; + +import java.io.IOException; +import java.util.List; + +public class ParallelIndexTaskClient extends IndexTaskClient +{ + private final String subtaskId; + + public ParallelIndexTaskClient( Review comment: Suggest making this package private so that someone doesn't put a non-Smile object mapper in the constructor This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201773728 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexTaskRunner.java ## @@ -0,0 +1,32 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.task; + +import io.druid.indexer.TaskState; +import io.druid.indexing.common.TaskToolbox; + +/** + * ParallelIndexTaskRunner is the actual task runner of {@link ParallelIndexSupervisorTask}. There is currently a single + * implementation, i.e. {@link SinglePhaseParallelIndexTaskRunner} which supports only best-effort roll-up. We can add + * more implementations in the future. + */ +public interface ParallelIndexTaskRunner +{ + TaskState run(TaskToolbox toolbox) throws Exception; Review comment: This is not terribly descriptive. Can you explain the expectations of the `run` method, especially for future implementations vs current implementation? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201763005 ## File path: indexing-service/src/main/java/io/druid/indexing/common/Counters.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.google.common.util.concurrent.AtomicDouble; Review comment: Did you mean to get the google common one? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201762816 ## File path: indexing-service/src/main/java/io/druid/indexing/appenderator/SegmentAllocateActionGenerator.java ## @@ -0,0 +1,35 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.appenderator; + +import io.druid.data.input.InputRow; +import io.druid.indexing.common.actions.TaskAction; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; + +public interface SegmentAllocateActionGenerator +{ + TaskAction generate( Review comment: Can you add java docs about what this method is supposed to do, and what implementation guarantees should exist? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201760102 ## File path: extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java ## @@ -180,18 +113,17 @@ public boolean resume(final String id) if (response.getStatus().equals(HttpResponseStatus.OK)) { log.info("Task [%s] paused successfully", id); -return jsonMapper.readValue(response.getContent(), new TypeReference>() +return deserialize(response.getContent(), new TypeReference>() Review comment: Is the return object codified anywhere? any way to use an object instead of a map? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201761280 ## File path: extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java ## @@ -362,317 +288,70 @@ public boolean setEndOffsets( log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, endOffsets, finalize); try { - final FullResponseHolder response = submitRequest( + final FullResponseHolder response = submitJsonRequest( id, HttpMethod.POST, "offsets/end", StringUtils.format("finish=%s", finalize), - jsonMapper.writeValueAsBytes(endOffsets), + serialize(endOffsets), true ); - return response.getStatus().getCode() / 100 == 2; + return isSuccess(response); } catch (NoTaskLocationException e) { return false; } catch (IOException e) { - throw new RuntimeException(e); + throw Throwables.propagate(e); Review comment: Would it make sense to just allow the method to throw `IOException`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201767065 ## File path: indexing-service/src/main/java/io/druid/indexing/common/actions/CountingSegmentAllocateAction.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.KeyDeserializer; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import io.druid.indexing.common.Counters; +import io.druid.indexing.common.task.Task; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.SortedSet; + +/** + * This action is to find a proper {@link io.druid.timeline.partition.ShardSpec} based on counting. This keeps track of + * the next shard number per {@link Interval} in {@link Counters}. The next shard number is incremented by 1 whenever a + * new {@link SegmentIdentifier} is allocated. + */ +public class CountingSegmentAllocateAction implements TaskAction +{ + private final String dataSource; + private final DateTime timestamp; + private final GranularitySpec granularitySpec; + @JsonDeserialize(keyUsing = IntervalDeserializer.class) + private final Map versions; + + private final SortedSet bucketIntervals; + + @JsonCreator + public CountingSegmentAllocateAction( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("timestamp") DateTime timestamp, + @JsonProperty("granularitySpec") GranularitySpec granularitySpec, + @JsonProperty("versions") Map versions + ) + { +this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); +this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp"); +this.granularitySpec = Preconditions.checkNotNull(granularitySpec, "granularitySpec"); +this.versions = Preconditions.checkNotNull(versions, "versions"); + +this.bucketIntervals = Preconditions.checkNotNull(granularitySpec.bucketIntervals().orNull(), "bucketIntervals"); + } + + @JsonProperty + public String getDataSource() + { +return dataSource; + } + + @JsonProperty + public DateTime getTimestamp() + { +return timestamp; + } + + @JsonProperty + public GranularitySpec getGranularitySpec() + { +return granularitySpec; + } + + @JsonProperty + public Map getVersions() + { +return versions; + } + + @Override + public TypeReference getReturnTypeReference() + { +return new TypeReference() Review comment: this can be a static final field This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201774505 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/PushedSegmentsReport.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.timeline.DataSegment; + +import java.util.List; + +public class PushedSegmentsReport Review comment: Can you add a brief comment on when this is used? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201767683 ## File path: indexing-service/src/main/java/io/druid/indexing/common/actions/CountingSegmentAllocateAction.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.KeyDeserializer; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import io.druid.indexing.common.Counters; +import io.druid.indexing.common.task.Task; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.SortedSet; + +/** + * This action is to find a proper {@link io.druid.timeline.partition.ShardSpec} based on counting. This keeps track of + * the next shard number per {@link Interval} in {@link Counters}. The next shard number is incremented by 1 whenever a + * new {@link SegmentIdentifier} is allocated. + */ +public class CountingSegmentAllocateAction implements TaskAction +{ + private final String dataSource; + private final DateTime timestamp; + private final GranularitySpec granularitySpec; + @JsonDeserialize(keyUsing = IntervalDeserializer.class) + private final Map versions; + + private final SortedSet bucketIntervals; + + @JsonCreator + public CountingSegmentAllocateAction( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("timestamp") DateTime timestamp, + @JsonProperty("granularitySpec") GranularitySpec granularitySpec, + @JsonProperty("versions") Map versions + ) + { +this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); +this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp"); +this.granularitySpec = Preconditions.checkNotNull(granularitySpec, "granularitySpec"); +this.versions = Preconditions.checkNotNull(versions, "versions"); + +this.bucketIntervals = Preconditions.checkNotNull(granularitySpec.bucketIntervals().orNull(), "bucketIntervals"); + } + + @JsonProperty + public String getDataSource() + { +return dataSource; + } + + @JsonProperty + public DateTime getTimestamp() + { +return timestamp; + } + + @JsonProperty + public GranularitySpec getGranularitySpec() + { +return granularitySpec; + } + + @JsonProperty + public Map getVersions() + { +return versions; + } + + @Override + public TypeReference getReturnTypeReference() + { +return new TypeReference() +{ +}; + } + + @Override + public SegmentIdentifier perform(Task task, TaskActionToolbox toolbox) + { +Optional maybeInterval = granularitySpec.bucketInterval(timestamp); +if (!maybeInterval.isPresent()) { + throw new ISE("Could not find interval for timestamp [%s]", timestamp); +} + +final Interval interval = maybeInterval.get(); +if (!bucketIntervals.contains(interval)) { + throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec); +} + +final Counters counters = toolbox.getCounters(); Review comment: There's no way to clear out counters. How does this work for long-running toolbox instances? Like if an overlord runs for weeks or months at a time, it will just keep accumulating interval maps? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL
[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201761921 ## File path: extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskModule.java ## @@ -50,5 +53,8 @@ @Override public void configure(Binder binder) { +binder.bind(new TypeLiteral>(){}) Review comment: will this get new-line'ed on a code reformat? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201767881 ## File path: indexing-service/src/main/java/io/druid/indexing/common/actions/CountingSegmentAllocateAction.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.KeyDeserializer; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import io.druid.indexing.common.Counters; +import io.druid.indexing.common.task.Task; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.SortedSet; + +/** + * This action is to find a proper {@link io.druid.timeline.partition.ShardSpec} based on counting. This keeps track of + * the next shard number per {@link Interval} in {@link Counters}. The next shard number is incremented by 1 whenever a + * new {@link SegmentIdentifier} is allocated. + */ +public class CountingSegmentAllocateAction implements TaskAction +{ + private final String dataSource; + private final DateTime timestamp; + private final GranularitySpec granularitySpec; + @JsonDeserialize(keyUsing = IntervalDeserializer.class) + private final Map versions; + + private final SortedSet bucketIntervals; + + @JsonCreator + public CountingSegmentAllocateAction( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("timestamp") DateTime timestamp, + @JsonProperty("granularitySpec") GranularitySpec granularitySpec, + @JsonProperty("versions") Map versions + ) + { +this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); +this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp"); +this.granularitySpec = Preconditions.checkNotNull(granularitySpec, "granularitySpec"); +this.versions = Preconditions.checkNotNull(versions, "versions"); + +this.bucketIntervals = Preconditions.checkNotNull(granularitySpec.bucketIntervals().orNull(), "bucketIntervals"); + } + + @JsonProperty + public String getDataSource() + { +return dataSource; + } + + @JsonProperty + public DateTime getTimestamp() + { +return timestamp; + } + + @JsonProperty + public GranularitySpec getGranularitySpec() + { +return granularitySpec; + } + + @JsonProperty + public Map getVersions() + { +return versions; + } + + @Override + public TypeReference getReturnTypeReference() + { +return new TypeReference() +{ +}; + } + + @Override + public SegmentIdentifier perform(Task task, TaskActionToolbox toolbox) + { +Optional maybeInterval = granularitySpec.bucketInterval(timestamp); +if (!maybeInterval.isPresent()) { + throw new ISE("Could not find interval for timestamp [%s]", timestamp); +} + +final Interval interval = maybeInterval.get(); +if (!bucketIntervals.contains(interval)) { + throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec); +} + +final Counters counters = toolbox.getCounters(); Review comment: That seems like it will cause Heap problems. Is there any sort of cleanup mechanism? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at:
[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201781345 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java ## @@ -0,0 +1,502 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.client.indexing.TaskStatusResponse; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.common.logger.Logger; + +import javax.annotation.Nullable; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Responsible for submitting tasks, monitoring task statuses, resubmitting failed tasks, and returning the final task + * status. + */ +public class TaskMonitor +{ + private static final Logger log = new Logger(TaskMonitor.class); + + private final ScheduledExecutorService taskStatusChecker = Execs.scheduledSingleThreaded(("task-monitor-%d")); + + /** + * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state of running {@link SubTaskSpec}s. This is + * read in {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and updated in {@link #submit} + * and {@link #retry}. This can also be read by calling {@link #getRunningTaskMonitorEntory}, + * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}. + */ + private final ConcurrentMap runningTasks = new ConcurrentHashMap<>(); + + /** + * A map of subTaskSpecId to {@link TaskHistory}. This map stores the history of complete {@link SubTaskSpec}s + * whether their final state is succeeded or failed. This is updated in {@link MonitorEntry#setLastStatus} which is + * called by the {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and can be + * read by outside of this class. + */ + private final ConcurrentMap> taskHistories = new ConcurrentHashMap<>(); + + // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks + private final Object taskCountLock = new Object(); + + // overlord client + private final IndexingServiceClient indexingServiceClient; + private final int maxRetry; + private final int expectedNumSucceededTasks; + + private int numRunningTasks; + private int numSucceededTasks; + private int numFailedTasks; + + private volatile boolean running = false; + + TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int expectedNumSucceededTasks) + { +this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient"); +this.maxRetry = maxRetry; +this.expectedNumSucceededTasks = expectedNumSucceededTasks; + +log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", expectedNumSucceededTasks); + } + + public void start(long taskStatusCheckingPeriod) + { +running = true; +log.info("Starting taskMonitor"); +// NOTE: This polling can be improved to event-driven pushing by registering TaskRunnerListener to TaskRunner. +// That listener should be able to send the events reported to TaskRunner to this TaskMonitor. +taskStatusChecker.scheduleAtFixedRate( +() -> { + try { +final Iterator> iterator = runningTasks.entrySet().iterator(); +while
[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201782258 ## File path: indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java ## @@ -465,6 +467,23 @@ public RemoteTaskRunnerConfig getConfig() return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values())); } + @Nullable + @Override + public RunnerTaskState getRunnerTaskState(String taskId) Review comment: can this return `Optional< RunnerTaskState>` instead of being `Nullable`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201780850 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java ## @@ -0,0 +1,502 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.client.indexing.TaskStatusResponse; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.common.logger.Logger; + +import javax.annotation.Nullable; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Responsible for submitting tasks, monitoring task statuses, resubmitting failed tasks, and returning the final task + * status. + */ +public class TaskMonitor +{ + private static final Logger log = new Logger(TaskMonitor.class); + + private final ScheduledExecutorService taskStatusChecker = Execs.scheduledSingleThreaded(("task-monitor-%d")); + + /** + * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state of running {@link SubTaskSpec}s. This is + * read in {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and updated in {@link #submit} + * and {@link #retry}. This can also be read by calling {@link #getRunningTaskMonitorEntory}, + * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}. + */ + private final ConcurrentMap runningTasks = new ConcurrentHashMap<>(); + + /** + * A map of subTaskSpecId to {@link TaskHistory}. This map stores the history of complete {@link SubTaskSpec}s + * whether their final state is succeeded or failed. This is updated in {@link MonitorEntry#setLastStatus} which is + * called by the {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and can be + * read by outside of this class. + */ + private final ConcurrentMap> taskHistories = new ConcurrentHashMap<>(); + + // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks + private final Object taskCountLock = new Object(); + + // overlord client + private final IndexingServiceClient indexingServiceClient; + private final int maxRetry; + private final int expectedNumSucceededTasks; + + private int numRunningTasks; + private int numSucceededTasks; + private int numFailedTasks; + + private volatile boolean running = false; + + TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int expectedNumSucceededTasks) + { +this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient"); +this.maxRetry = maxRetry; +this.expectedNumSucceededTasks = expectedNumSucceededTasks; + +log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", expectedNumSucceededTasks); + } + + public void start(long taskStatusCheckingPeriod) + { +running = true; +log.info("Starting taskMonitor"); +// NOTE: This polling can be improved to event-driven pushing by registering TaskRunnerListener to TaskRunner. +// That listener should be able to send the events reported to TaskRunner to this TaskMonitor. +taskStatusChecker.scheduleAtFixedRate( +() -> { + try { +final Iterator> iterator = runningTasks.entrySet().iterator(); +while
Re: Gitbox notifications
As it happens, all 3 of Druid's mentors (including myself) are ASF members. Their names are bold in https://people.apache.org/phonebook.html?podling=druid. I am busy, I will try to get to it today. On Wed, Jul 11, 2018 at 10:20 AM, Roman Leventov wrote: > It writes "Access restricted to ASF members and PMC chairs only!" to me. > "ASF members" != any Apache project committers, this is a special title, > little people have it. > > On Wed, 11 Jul 2018 at 11:50, Gian Merlino wrote: > >> Infra wrote back asking us to add a comm...@druid.apache.org list via >> https://selfserve.apache.org/. It sounds like once we do that, they can >> move the notifications. I tried logging in with my apache id, but the tool >> wouldn't let me in. >> >> Does anyone else have the ability to use this tool? Maybe one of our >> mentors? >> >> On Sat, Jul 7, 2018 at 8:20 AM Gian Merlino wrote: >> >> > I asked yesterday in our repo migration Infra ticket to adjust the >> mailing >> > lists: >> > >> > >> https://issues.apache.org/jira/browse/INFRA-16674?focusedCommentId=16535589=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16535589 >> . >> > I am not sure if we can expect Infra to adjust things on the weekend but >> > hopefully it can be done soon. >> > >> > Discussion in this thread is also relevant: >> > >> https://lists.apache.org/thread.html/dac18141e34cd6f49d8b4136046d6d8e2fd237986c9c6b9a2d12948d@%3Cdev.druid.apache.org%3E >> . >> > I linked some filters there that I'm using to manage the spam a bit. >> > >> > >> > On Sat, Jul 7, 2018 at 4:37 AM Roman Leventov >> wrote: >> > >> >> Could we forward GitBox notifications to a dedicated mailing list? I'm >> >> afraid that the huge volume of spam will discourage people from >> >> subscribing >> >> to this list. >> >> >> > >> - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
Re: Gitbox notifications
It writes "Access restricted to ASF members and PMC chairs only!" to me. "ASF members" != any Apache project committers, this is a special title, little people have it. On Wed, 11 Jul 2018 at 11:50, Gian Merlino wrote: > Infra wrote back asking us to add a comm...@druid.apache.org list via > https://selfserve.apache.org/. It sounds like once we do that, they can > move the notifications. I tried logging in with my apache id, but the tool > wouldn't let me in. > > Does anyone else have the ability to use this tool? Maybe one of our > mentors? > > On Sat, Jul 7, 2018 at 8:20 AM Gian Merlino wrote: > > > I asked yesterday in our repo migration Infra ticket to adjust the > mailing > > lists: > > > > > https://issues.apache.org/jira/browse/INFRA-16674?focusedCommentId=16535589=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16535589 > . > > I am not sure if we can expect Infra to adjust things on the weekend but > > hopefully it can be done soon. > > > > Discussion in this thread is also relevant: > > > https://lists.apache.org/thread.html/dac18141e34cd6f49d8b4136046d6d8e2fd237986c9c6b9a2d12948d@%3Cdev.druid.apache.org%3E > . > > I linked some filters there that I'm using to manage the spam a bit. > > > > > > On Sat, Jul 7, 2018 at 4:37 AM Roman Leventov > wrote: > > > >> Could we forward GitBox notifications to a dedicated mailing list? I'm > >> afraid that the huge volume of spam will discourage people from > >> subscribing > >> to this list. > >> > > >
[GitHub] gianm commented on issue #5976: Update license headers.
gianm commented on issue #5976: Update license headers. URL: https://github.com/apache/incubator-druid/pull/5976#issuecomment-404239472 Thanks for reviewing, everyone -- I will merge this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] asdf2014 commented on issue #5980: Various changes about a few coding specifications
asdf2014 commented on issue #5980: Various changes about a few coding specifications URL: https://github.com/apache/incubator-druid/pull/5980#issuecomment-404224485 [job-402722329](https://travis-ci.org/apache/incubator-druid/jobs/402722329) and [job-402722330](https://travis-ci.org/apache/incubator-druid/jobs/402722330) both failed... job-402722329: ```bash [ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.19.1:test (default-test) on project druid-processing: ExecutionException The forked VM terminated without properly saying goodbye. VM crash or System.exit called? [ERROR] Command was /bin/sh -c cd /home/travis/build/apache/incubator-druid/processing && /usr/lib/jvm/java-8-oracle/jre/bin/java -Xmx768m -Duser.language=en -Duser.country=US -Dfile.encoding=UTF-8 -Duser.timezone=UTC -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Ddruid.indexing.doubleStorage=double -jar /home/travis/build/apache/incubator-druid/processing/target/surefire/surefirebooter678641966742689650.jar /home/travis/build/apache/incubator-druid/processing/target/surefire/surefire1266228946009469426tmp /home/travis/build/apache/incubator-druid/processing/target/surefire/surefire_14722988346361641729tmp ``` It seems that `-Xmx768m` is not enough, may we should add `-Xmx2048m -Xms512m` for maven-surefire-plugin in druid-processing module? job-402722330: ```bash CuratorDruidCoordinatorTest.testMoveSegment:369 ยป test timed out after 6 ... ``` After [PR#5978](https://github.com/apache/incubator-druid/pull/5978), sometimes the timeout value is still not enough. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org