[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-02-01 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r253242662
 
 

 ##
 File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
 ##
 @@ -106,10 +106,15 @@ public void start()
 }
 try {
   if (isCacheEnabled) {
-scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
-lifecycleLock.started();
-log.info("MetadataSegmentView Started.");
+try {
+  poll();
+}
+finally {
+  scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
 
 Review comment:
   Please set the proper wait time instead of starting poll() immediately.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-02-01 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r253242562
 
 

 ##
 File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
 ##
 @@ -106,10 +106,15 @@ public void start()
 }
 try {
   if (isCacheEnabled) {
-scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
-lifecycleLock.started();
-log.info("MetadataSegmentView Started.");
+try {
+  poll();
+}
+finally {
 
 Review comment:
   Please add a catch block to emit exceptions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-02-01 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r253188129
 
 

 ##
 File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
 ##
 @@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.schema;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BrokerSegmentWatcherConfig;
+import org.apache.druid.client.DataSegmentInterner;
+import org.apache.druid.client.JsonParserIterator;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.concurrent.LifecycleLock;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final EmittingLogger log = new 
EmittingLogger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final boolean isCacheEnabled;
+  private final ConcurrentMap publishedSegments;
+  private final ScheduledExecutorService scheduledExec;
+  private final long pollPeriodinMS;
+  private LifecycleLock lifecycleLock = new LifecycleLock();
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig,
+  final PlannerConfig plannerConfig
+  )
+  {
+Preconditions.checkNotNull(plannerConfig, "plannerConfig");
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable();
+this.pollPeriodinMS = plannerConfig.getMetadataSegmentPollPeriod();
+this.publishedSegments = isCacheEnabled ? new ConcurrentHashMap<>(1000) : 
null;
+this.scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+if (!lifecycleLock.canStart()) {
+  throw new ISE("can't start.");
+}
+try {
+  if (isCacheEnabled) {
+scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
+lifecycleLock.started();
+log.info("MetadataSegmentView Started.");
+  }
+}
+   

[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-02-01 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r253155145
 
 

 ##
 File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
 ##
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.schema;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BrokerSegmentWatcherConfig;
+import org.apache.druid.client.DataSegmentInterner;
+import org.apache.druid.client.JsonParserIterator;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final long POLL_PERIOD_IN_MS = 6;
+  private static final EmittingLogger log = new 
EmittingLogger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>(1000);
+  private ScheduledExecutorService scheduledExec;
+  final PlannerConfig plannerConfig;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig,
+  final PlannerConfig plannerConfig
+  )
+  {
+this.plannerConfig = Preconditions.checkNotNull(plannerConfig, 
"plannerConfig");
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+if (plannerConfig.isMetadataSegmentCacheEnable()) {
+  scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+  scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
+}
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
 
 Review comment:
   Ah, I think those locks are required for some modules which can be started 
and stopped repeatedly, like whenever getting/losing the zk leadership. For 
them, start() and stop() can be called simultaneously at any time, so they need 
a lock for coordination. 
   However, `MetadataSegmentView` has the same lifecycle with the broker. Once 
the broker is stopped, 

[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-02-01 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r253134452
 
 

 ##
 File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
 ##
 @@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.schema;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BrokerSegmentWatcherConfig;
+import org.apache.druid.client.DataSegmentInterner;
+import org.apache.druid.client.JsonParserIterator;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.concurrent.LifecycleLock;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final EmittingLogger log = new 
EmittingLogger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final boolean isCacheEnabled;
+  private final ConcurrentMap publishedSegments;
+  private ScheduledExecutorService scheduledExec;
+  private final long pollPeriodinMS;
+  private LifecycleLock lifecycleLock = new LifecycleLock();
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig,
+  final PlannerConfig plannerConfig
+  )
+  {
+Preconditions.checkNotNull(plannerConfig, "plannerConfig");
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable();
+this.pollPeriodinMS = plannerConfig.getMetadataSegmentPollPeriod();
+this.publishedSegments = isCacheEnabled ? new ConcurrentHashMap<>(1000) : 
new ConcurrentHashMap<>();
 
 Review comment:
   null ifCacheEnabled = false?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-02-01 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r253132485
 
 

 ##
 File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
 ##
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.schema;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BrokerSegmentWatcherConfig;
+import org.apache.druid.client.DataSegmentInterner;
+import org.apache.druid.client.JsonParserIterator;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final long POLL_PERIOD_IN_MS = 6;
+  private static final EmittingLogger log = new 
EmittingLogger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>(1000);
+  private ScheduledExecutorService scheduledExec;
+  final PlannerConfig plannerConfig;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig,
+  final PlannerConfig plannerConfig
+  )
+  {
+this.plannerConfig = Preconditions.checkNotNull(plannerConfig, 
"plannerConfig");
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+if (plannerConfig.isMetadataSegmentCacheEnable()) {
+  scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+  scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
+}
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
 
 Review comment:
   Why synchronization?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-02-01 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r253133403
 
 

 ##
 File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
 ##
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.schema;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BrokerSegmentWatcherConfig;
+import org.apache.druid.client.DataSegmentInterner;
+import org.apache.druid.client.JsonParserIterator;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final long POLL_PERIOD_IN_MS = 6;
+  private static final EmittingLogger log = new 
EmittingLogger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>(1000);
+  private ScheduledExecutorService scheduledExec;
+  final PlannerConfig plannerConfig;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig,
+  final PlannerConfig plannerConfig
+  )
+  {
+this.plannerConfig = Preconditions.checkNotNull(plannerConfig, 
"plannerConfig");
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+if (plannerConfig.isMetadataSegmentCacheEnable()) {
+  scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+  scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
+}
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
 
 Review comment:
   I think it can happen, but it would be better to initialize `scheduledExec` 
in the constructor and make it final rather than using synchronization.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries 

[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-02-01 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r253134211
 
 

 ##
 File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
 ##
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.schema;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BrokerSegmentWatcherConfig;
+import org.apache.druid.client.DataSegmentInterner;
+import org.apache.druid.client.JsonParserIterator;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final long POLL_PERIOD_IN_MS = 6;
 
 Review comment:
   An nvm. Github shows an old version..


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-02-01 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r253133696
 
 

 ##
 File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
 ##
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.schema;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BrokerSegmentWatcherConfig;
+import org.apache.druid.client.DataSegmentInterner;
+import org.apache.druid.client.JsonParserIterator;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final long POLL_PERIOD_IN_MS = 6;
 
 Review comment:
   @surekhasaharan hmm I don't see a config for this in `PlannerConfig`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-02-01 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r253132922
 
 

 ##
 File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
 ##
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.schema;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BrokerSegmentWatcherConfig;
+import org.apache.druid.client.DataSegmentInterner;
+import org.apache.druid.client.JsonParserIterator;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final long POLL_PERIOD_IN_MS = 6;
+  private static final EmittingLogger log = new 
EmittingLogger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>(1000);
+  private ScheduledExecutorService scheduledExec;
+  final PlannerConfig plannerConfig;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig,
+  final PlannerConfig plannerConfig
+  )
+  {
+this.plannerConfig = Preconditions.checkNotNull(plannerConfig, 
"plannerConfig");
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+if (plannerConfig.isMetadataSegmentCacheEnable()) {
+  scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+  scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
+}
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
 
 Review comment:
   Do you mean, stop() can be called before `scheduledExec` is set?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With 

[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-31 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252903631
 
 

 ##
 File path: 
sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
 ##
 @@ -66,6 +66,14 @@
   @JsonProperty
   private DateTimeZone sqlTimeZone = DateTimeZone.UTC;
 
+  @JsonProperty
+  private boolean metadataSegmentCacheEnable = false;
 
 Review comment:
   Would you please add a doc for this configuration?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-30 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252509453
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final long POLL_PERIOD_IN_MS = 6;
+  private static final EmittingLogger log = new 
EmittingLogger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>(1000);
+  private ScheduledExecutorService scheduledExec;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig
+  )
+  {
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
+scheduledExec = null;
+  }
+
+  private void poll()
+  {
+log.info("polling published segments from coordinator");
+//get authorized published segments from coordinator
+final JsonParserIterator metadataSegments = 
getMetadataSegments(
+coordinatorDruidLeaderClient,
+jsonMapper,
+responseHandler,
+segmentWatcherConfig.getWatchedDataSources()
+);
+
+final DateTime timestamp = DateTimes.nowUtc();
+while (metadataSegments.hasNext()) {
+  final DataSegment interned = 
DataSegmentInterner.intern(metadataSegments.next());
+  // timestamp is used to filter deleted segments
+  publishedSegments.put(interned, timestamp);
+}
+// filter the segments from cache whose timestamp is not equal to latest 
timestamp stored,
+// since the presence of a segment with an earlier timestamp indicates that
+// "that" segment is not returned by coordinator in 

[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-30 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252508479
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final long POLL_PERIOD_IN_MS = 6;
+  private static final EmittingLogger log = new 
EmittingLogger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>(1000);
+  private ScheduledExecutorService scheduledExec;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig
+  )
+  {
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
+scheduledExec = null;
+  }
+
+  private void poll()
+  {
+log.info("polling published segments from coordinator");
+//get authorized published segments from coordinator
 
 Review comment:
   I guess this comment is not correct? The authorization happens in 
`SystemSchema`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-30 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252509856
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final int DEFAULT_POLL_PERIOD_IN_MS = 6;
+  private static final Logger log = new Logger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>();
+  private ScheduledExecutorService scheduledExec;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig
+  )
+  {
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
+scheduledExec = null;
+  }
+
+  private void poll()
+  {
+log.info("polling published segments from coordinator");
+//get authorized published segments from coordinator
+final JsonParserIterator metadataSegments = 
getMetadataSegments(
+coordinatorDruidLeaderClient,
+jsonMapper,
+responseHandler,
+segmentWatcherConfig.getWatchedDataSources()
+);
+
+final DateTime timestamp = DateTimes.nowUtc();
+while (metadataSegments.hasNext()) {
+  final DataSegment interned = 
DataSegmentInterner.intern(metadataSegments.next());
+  // timestamp is used to filter deleted segments
+  publishedSegments.put(interned, timestamp);
+}
+// filter the segments from cache whose timestamp is not equal to latest 
timestamp stored,
+// since the presence of a segment with an earlier timestamp indicates that

[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-30 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252508669
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final long POLL_PERIOD_IN_MS = 6;
+  private static final EmittingLogger log = new 
EmittingLogger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>(1000);
+  private ScheduledExecutorService scheduledExec;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig
+  )
+  {
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
+scheduledExec = null;
+  }
+
+  private void poll()
+  {
+log.info("polling published segments from coordinator");
+//get authorized published segments from coordinator
+final JsonParserIterator metadataSegments = 
getMetadataSegments(
+coordinatorDruidLeaderClient,
+jsonMapper,
+responseHandler,
+segmentWatcherConfig.getWatchedDataSources()
+);
+
+final DateTime timestamp = DateTimes.nowUtc();
+while (metadataSegments.hasNext()) {
+  final DataSegment interned = 
DataSegmentInterner.intern(metadataSegments.next());
+  // timestamp is used to filter deleted segments
+  publishedSegments.put(interned, timestamp);
+}
+// filter the segments from cache whose timestamp is not equal to latest 
timestamp stored,
+// since the presence of a segment with an earlier timestamp indicates that
+// "that" segment is not returned by coordinator in 

[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-30 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252486640
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final int DEFAULT_POLL_PERIOD_IN_MS = 6;
+  private static final Logger log = new Logger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>();
+  private ScheduledExecutorService scheduledExec;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig
+  )
+  {
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
+scheduledExec = null;
+  }
+
+  private void poll()
+  {
+log.info("polling published segments from coordinator");
+//get authorized published segments from coordinator
+final JsonParserIterator metadataSegments = 
getMetadataSegments(
+coordinatorDruidLeaderClient,
+jsonMapper,
+responseHandler,
+segmentWatcherConfig.getWatchedDataSources()
+);
+
+final DateTime timestamp = DateTimes.nowUtc();
+while (metadataSegments.hasNext()) {
+  final DataSegment interned = 
DataSegmentInterner.intern(metadataSegments.next());
+  // timestamp is used to filter deleted segments
+  publishedSegments.put(interned, timestamp);
+}
+// filter the segments from cache whose timestamp is not equal to latest 
timestamp stored,
+// since the presence of a segment with an earlier timestamp indicates that

[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-30 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252472637
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final int DEFAULT_POLL_PERIOD_IN_MS = 6;
+  private static final Logger log = new Logger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>();
+  private ScheduledExecutorService scheduledExec;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig
+  )
+  {
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
+scheduledExec = null;
+  }
+
+  private void poll()
+  {
+log.info("polling published segments from coordinator");
+//get authorized published segments from coordinator
+final JsonParserIterator metadataSegments = 
getMetadataSegments(
+coordinatorDruidLeaderClient,
+jsonMapper,
+responseHandler,
+segmentWatcherConfig.getWatchedDataSources()
+);
+
+final DateTime timestamp = DateTimes.nowUtc();
+while (metadataSegments.hasNext()) {
+  final DataSegment interned = 
DataSegmentInterner.intern(metadataSegments.next());
+  // timestamp is used to filter deleted segments
+  publishedSegments.put(interned, timestamp);
+}
+// filter the segments from cache whose timestamp is not equal to latest 
timestamp stored,
+// since the presence of a segment with an earlier timestamp indicates that

[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-30 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252484385
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/DataSegmentInterner.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+import org.apache.druid.timeline.DataSegment;
+
+public class DataSegmentInterner
+{
+  private static final Interner REALTIME_INTERNER = 
Interners.newWeakInterner();
+  private static final Interner HISTORICAL_INTERNER = 
Interners.newWeakInterner();
+
+  private DataSegmentInterner()
+  {
+
+  }
+
+  public static DataSegment intern(DataSegment segment)
+  {
+return segment.getSize() > 0 ? HISTORICAL_INTERNER.intern(segment) : 
REALTIME_INTERNER.intern(segment);
 
 Review comment:
   It's worth to comment why `size` can be an indicator of a realtime or 
historical segment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-30 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252483278
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final int DEFAULT_POLL_PERIOD_IN_MS = 6;
+  private static final Logger log = new Logger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>();
+  private ScheduledExecutorService scheduledExec;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig
+  )
+  {
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
+scheduledExec = null;
+  }
+
+  private void poll()
+  {
+log.info("polling published segments from coordinator");
+//get authorized published segments from coordinator
+final JsonParserIterator metadataSegments = 
getMetadataSegments(
+coordinatorDruidLeaderClient,
+jsonMapper,
+responseHandler,
+segmentWatcherConfig.getWatchedDataSources()
+);
+
+final DateTime timestamp = DateTimes.nowUtc();
+while (metadataSegments.hasNext()) {
+  final DataSegment interned = 
DataSegmentInterner.intern(metadataSegments.next());
+  // timestamp is used to filter deleted segments
+  publishedSegments.put(interned, timestamp);
+}
+// filter the segments from cache whose timestamp is not equal to latest 
timestamp stored,
+// since the presence of a segment with an earlier timestamp indicates that

[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-30 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252474145
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final int DEFAULT_POLL_PERIOD_IN_MS = 6;
+  private static final Logger log = new Logger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>();
 
 Review comment:
   I think the size of this map would be usually very large. It would be better 
to initialize with a larger initial size like 1000.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-30 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252473415
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final int DEFAULT_POLL_PERIOD_IN_MS = 6;
 
 Review comment:
   Are you going to make this configurable? If so, please add a configuration. 
Otherwise, please rename to `POLL_PERIOD_IN_MS`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-30 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252479790
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final int DEFAULT_POLL_PERIOD_IN_MS = 6;
+  private static final Logger log = new Logger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>();
+  private ScheduledExecutorService scheduledExec;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig
+  )
+  {
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
+scheduledExec = null;
+  }
+
+  private void poll()
+  {
+log.info("polling published segments from coordinator");
+//get authorized published segments from coordinator
+final JsonParserIterator metadataSegments = 
getMetadataSegments(
+coordinatorDruidLeaderClient,
+jsonMapper,
+responseHandler,
+segmentWatcherConfig.getWatchedDataSources()
+);
+
+final DateTime timestamp = DateTimes.nowUtc();
+while (metadataSegments.hasNext()) {
+  final DataSegment interned = 
DataSegmentInterner.intern(metadataSegments.next());
+  // timestamp is used to filter deleted segments
+  publishedSegments.put(interned, timestamp);
+}
+// filter the segments from cache whose timestamp is not equal to latest 
timestamp stored,
+// since the presence of a segment with an earlier timestamp indicates that

[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-30 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252484268
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/DataSegmentInterner.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+import org.apache.druid.timeline.DataSegment;
+
+public class DataSegmentInterner
 
 Review comment:
   Would you please add a javadoc about what this is doing and why we need two 
interners?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-30 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252473184
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final int DEFAULT_POLL_PERIOD_IN_MS = 6;
+  private static final Logger log = new Logger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>();
+  private ScheduledExecutorService scheduledExec;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig
+  )
+  {
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
+scheduledExec = null;
+  }
+
+  private void poll()
+  {
+log.info("polling published segments from coordinator");
+//get authorized published segments from coordinator
+final JsonParserIterator metadataSegments = 
getMetadataSegments(
+coordinatorDruidLeaderClient,
+jsonMapper,
+responseHandler,
+segmentWatcherConfig.getWatchedDataSources()
+);
+
+final DateTime timestamp = DateTimes.nowUtc();
+while (metadataSegments.hasNext()) {
+  final DataSegment interned = 
DataSegmentInterner.intern(metadataSegments.next());
+  // timestamp is used to filter deleted segments
+  publishedSegments.put(interned, timestamp);
+}
+// filter the segments from cache whose timestamp is not equal to latest 
timestamp stored,
+// since the presence of a segment with an earlier timestamp indicates that

[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-30 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252479292
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final int DEFAULT_POLL_PERIOD_IN_MS = 6;
+  private static final EmittingLogger log = new 
EmittingLogger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final Map publishedSegments = new 
ConcurrentHashMap<>();
+  private ScheduledExecutorService scheduledExec;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  ObjectMapper jsonMapper,
+  BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig
+  )
+  {
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+scheduledExec.scheduleWithFixedDelay(
+() -> poll(),
+0,
+DEFAULT_POLL_PERIOD_IN_MS,
+TimeUnit.MILLISECONDS
+);
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
+scheduledExec = null;
+  }
+
+  private void poll()
+  {
+log.info("polling published segments from coordinator");
+//get authorized published segments from coordinator
+final JsonParserIterator metadataSegments = 
getMetadataSegments(
+coordinatorDruidLeaderClient,
+jsonMapper,
+responseHandler
+);
+
+final DateTime ts = DateTimes.nowUtc();
+while (metadataSegments.hasNext()) {
+  final DataSegment currentSegment = metadataSegments.next();
+  final DataSegment interned;
+  if (currentSegment.getSize() > 0) {
+interned = 
DataSegmentInterner.HISTORICAL_INTERNER.intern(currentSegment);
+  } else {
+interned = 
DataSegmentInterner.REALTIME_INTERNER.intern(currentSegment);
+  }
+  publishedSegments.put(interned, ts);
+}
+// filter the segments from cache which may not be present in subsequent 
polling
+publishedSegments.values().removeIf(v -> v != ts);
+
+if (segmentWatcherConfig.getWatchedDataSources() != null) {
+  log.debug(
+  "filtering datasources[%s] in published segments 

[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-30 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252479978
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * This class polls the coordinator in background to keep the latest published 
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in 
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final int DEFAULT_POLL_PERIOD_IN_MS = 6;
+  private static final Logger log = new Logger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>();
+  private ScheduledExecutorService scheduledExec;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  final ObjectMapper jsonMapper,
+  final BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig
+  )
+  {
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
+scheduledExec = null;
+  }
+
+  private void poll()
+  {
+log.info("polling published segments from coordinator");
+//get authorized published segments from coordinator
+final JsonParserIterator metadataSegments = 
getMetadataSegments(
+coordinatorDruidLeaderClient,
+jsonMapper,
+responseHandler,
+segmentWatcherConfig.getWatchedDataSources()
+);
+
+final DateTime timestamp = DateTimes.nowUtc();
+while (metadataSegments.hasNext()) {
+  final DataSegment interned = 
DataSegmentInterner.intern(metadataSegments.next());
+  // timestamp is used to filter deleted segments
+  publishedSegments.put(interned, timestamp);
+}
+// filter the segments from cache whose timestamp is not equal to latest 
timestamp stored,
+// since the presence of a segment with an earlier timestamp indicates that

[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-29 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252087227
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/server/http/MetadataResource.java
 ##
 @@ -148,9 +151,25 @@ public Response 
getDatabaseSegmentDataSource(@PathParam("dataSourceName") final
   @GET
   @Path("/segments")
   @Produces(MediaType.APPLICATION_JSON)
-  public Response getDatabaseSegments(@Context final HttpServletRequest req)
+  public Response getDatabaseSegments(
+  @Context final HttpServletRequest req,
+  @QueryParam("datasources") String datasources
 
 Review comment:
   This can be simply a list or set.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-29 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252091270
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@ManageLifecycle
+public class MetadataSegmentView
 
 Review comment:
   It would be great if you can add a simple description about what this class 
does and how it's being used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-29 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252090538
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final int DEFAULT_POLL_PERIOD_IN_MS = 6;
+  private static final EmittingLogger log = new 
EmittingLogger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>();
+  private ScheduledExecutorService scheduledExec;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  ObjectMapper jsonMapper,
+  BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig
+  )
+  {
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+scheduledExec.scheduleWithFixedDelay(
+() -> {
+  try {
+poll();
+  }
+  catch (JsonProcessingException e) {
+log.makeAlert(e, "Problem polling Coordinator.").emit();
+  }
+},
+0,
+DEFAULT_POLL_PERIOD_IN_MS,
+TimeUnit.MILLISECONDS
+);
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
+scheduledExec = null;
+  }
+
+  private void poll() throws JsonProcessingException
+  {
+log.info("polling published segments from coordinator");
+//get authorized published segments from coordinator
+final JsonParserIterator metadataSegments = 
getMetadataSegments(
+coordinatorDruidLeaderClient,
+jsonMapper,
+responseHandler,
+segmentWatcherConfig.getWatchedDataSources()
+);
+
+final DateTime timestamp = DateTimes.nowUtc();
+while (metadataSegments.hasNext()) {
+  final DataSegment interned = 
DataSegmentInterner.intern(metadataSegments.next());
+  // timestamp is used to filter deleted segments
+  publishedSegments.put(interned, timestamp);
+}
+// filter the segments from cache whose timestamp is not equal to latest 

[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-29 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252085728
 
 

 ##
 File path: 
server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
 ##
 @@ -2213,13 +2213,13 @@ public void run()
 expectedResults.get(k).get(j)
 );
 serverExpectations.get(lastServer).addExpectation(expectation);
-
+EasyMock.expect(mockSegment.getSize()).andReturn(-1L).anyTimes();
 
 Review comment:
   I guess you're simulating the segment from realtime tasks. Then, it would be 
more realistic if it returns 0.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-29 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252090888
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/selector/ServerSelector.java
 ##
 @@ -50,7 +51,7 @@ public ServerSelector(
   TierSelectorStrategy strategy
   )
   {
-this.segment = new AtomicReference<>(segment);
+this.segment = new AtomicReference<>(DataSegmentInterner.intern(segment));
 
 Review comment:
   I'm not sure `segment` still needs to be an `AtomicReference`.. Let's figure 
it out in #6952.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-29 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r252091114
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final int DEFAULT_POLL_PERIOD_IN_MS = 6;
+  private static final EmittingLogger log = new 
EmittingLogger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final ConcurrentMap publishedSegments = new 
ConcurrentHashMap<>();
+  private ScheduledExecutorService scheduledExec;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  ObjectMapper jsonMapper,
+  BytesAccumulatingResponseHandler responseHandler,
 
 Review comment:
   Please add `final` to these two variables.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-28 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r251613136
 
 

 ##
 File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
 ##
 @@ -340,27 +332,6 @@ public TableType getJdbcTableType()
   return authorizedSegments.iterator();
 }
 
-private CloseableIterator getAuthorizedPublishedSegments(
 
 Review comment:
   Is this authorization not needed anymore?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #6901: Introduce published segment cache in broker

2019-01-28 Thread GitBox
jihoonson commented on a change in pull request #6901: Introduce published 
segment cache in broker
URL: https://github.com/apache/incubator-druid/pull/6901#discussion_r251608611
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
 ##
 @@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+  private static final int DEFAULT_POLL_PERIOD_IN_MS = 6;
+  private static final EmittingLogger log = new 
EmittingLogger(MetadataSegmentView.class);
+
+  private final DruidLeaderClient coordinatorDruidLeaderClient;
+  private final ObjectMapper jsonMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+  private final Map publishedSegments = new 
ConcurrentHashMap<>();
+  private ScheduledExecutorService scheduledExec;
+
+  @Inject
+  public MetadataSegmentView(
+  final @Coordinator DruidLeaderClient druidLeaderClient,
+  ObjectMapper jsonMapper,
+  BytesAccumulatingResponseHandler responseHandler,
+  final BrokerSegmentWatcherConfig segmentWatcherConfig
+  )
+  {
+this.coordinatorDruidLeaderClient = druidLeaderClient;
+this.jsonMapper = jsonMapper;
+this.responseHandler = responseHandler;
+this.segmentWatcherConfig = segmentWatcherConfig;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+scheduledExec = 
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+scheduledExec.scheduleWithFixedDelay(
+() -> poll(),
+0,
+DEFAULT_POLL_PERIOD_IN_MS,
+TimeUnit.MILLISECONDS
+);
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+scheduledExec.shutdownNow();
+scheduledExec = null;
+  }
+
+  private void poll()
+  {
+log.info("polling published segments from coordinator");
+//get authorized published segments from coordinator
+final JsonParserIterator metadataSegments = 
getMetadataSegments(
+coordinatorDruidLeaderClient,
+jsonMapper,
+responseHandler
+);
+
+final DateTime ts = DateTimes.nowUtc();
+while (metadataSegments.hasNext()) {
+  final DataSegment currentSegment = metadataSegments.next();
+  final DataSegment interned;
+  if (currentSegment.getSize() > 0) {
+interned = 
DataSegmentInterner.HISTORICAL_INTERNER.intern(currentSegment);
+  } else {
+interned = 
DataSegmentInterner.REALTIME_INTERNER.intern(currentSegment);
+  }
+  publishedSegments.put(interned, ts);
+}
+// filter the segments from cache which may not be present in subsequent 
polling
+publishedSegments.values().removeIf(v -> v != ts);
 
 Review comment:
   Would you elaborate more on what this is doing? And what happens if someone 
reads a segment by calling