[GitHub] zfxzf0421 opened a new issue #6267: Upgrade Issue (12.02) Pausing task until to timeout

2018-08-29 Thread GitBox
zfxzf0421 opened a new issue #6267: Upgrade Issue (12.02) Pausing task until to 
timeout
URL: https://github.com/apache/incubator-druid/issues/6267
 
 
   #5983 Will cause an upgrade issue.
   It remove a parameter resume(this is should always true).
   
https://github.com/apache/incubator-druid/pull/5983/files#diff-6ceeb9607fd4d7a73bdac826dd010193L323
   
   But in the previous version.
   
https://github.com/apache/incubator-druid/pull/5983/files#diff-83c0b637b5e4ea97b13a1e95b7b0a601L1477
   The default value of http post 'resume' is false.
   
   So when we upgrade the overload before middle manager, it will cause all 
task pausing until timeout.
   
   I think you should declare this issue in 
   http://druid.io/docs/latest/operations/rolling-updates.html


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] gianm commented on issue #6266: Rename io.druid to org.apache.druid.

2018-08-29 Thread GitBox
gianm commented on issue #6266: Rename io.druid to org.apache.druid.
URL: https://github.com/apache/incubator-druid/pull/6266#issuecomment-417182957
 
 
   Ah, @himanshug your comment makes me realize we should also update the 
migration code in MonitorsConfig. I'll do that now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] himanshug edited a comment on issue #6266: Rename io.druid to org.apache.druid.

2018-08-29 Thread GitBox
himanshug edited a comment on issue #6266: Rename io.druid to org.apache.druid.
URL: https://github.com/apache/incubator-druid/pull/6266#issuecomment-417176315
 
 
   added labels for `ReleaseNotes` and `Compatibility` as this one will 
definitely break all extensions.
   
   also users using monitors such as "io.druid.java.util.metrics.JvmMonitor" 
will need to change.
   
   +1 after the successful build.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] gianm commented on issue #6266: Rename io.druid to org.apache.druid.

2018-08-29 Thread GitBox
gianm commented on issue #6266: Rename io.druid to org.apache.druid.
URL: https://github.com/apache/incubator-druid/pull/6266#issuecomment-417172303
 
 
   @drcrallen Git actually can figure out that these are renames, but you must 
change these configs first:
   
   ```
   git config --local diff.renameLimit 5000
   git config --local merge.renameLimit 5000
   ```
   
   By default, git will disable rename detection if the number of files is >400 
for diffs or >1000 for merges.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213874231
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
+ * @param segments  The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an 
optional that is present
+ * if a cached value was found
+ */
+private Stream>> 
maybeFetchCacheResults(
 final byte[] queryCacheKey,
-final Set segments
+final Stream segments
 )
 {
   if (queryCacheKey == null) {
-return Collections.emptyList();
+return segments.map(s -> Pair.of(s, Optional.empty()));
   }
-  final List> alreadyCachedResults = 
Lists.newArrayList();
-  Map perSegmentCacheKeys = 
computePerSegmentCacheKeys(segments, queryCacheKey);
-  // Pull cached segments from cache and remove from set of segments to 
query
-  final Map cachedValues = 
computeCachedValues(perSegmentCacheKeys);
-
-  perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
-final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
-
-final byte[] cachedValue = cachedValues.get(segmentCacheKey);
-if (cachedValue != null) {
-  // remove cached segment from set of segments to query
-  segments.remove(segment);
-  alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
-} else if (populateCache) {
-  // otherwise, if populating cache, add segment to list of segments 
to cache
-  final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
-  addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
-}
-  });
-  return alreadyCachedResults;
+  // We materialize the stream here in order to have the bulk cache 
fetching work as expected
+  final List> materializedKeyList = 
computePerSegmentCacheKeys(
+  segments,
+  queryCacheKey
+  ).collect(Collectors.toList());
+
+  // Do bulk fetch
+  final Map> cachedValues = 
computeCachedValues(materializedKeyList.stream())
+  .collect(Pair.mapCollector());
+
+  // A limitation of the cache system is that the cached values are 
returned without passing through the original
+  // objects. This hash join is a way to get the ServerToSegment and 
Optional matched up again
+  return materializedKeyList
+  .stream()
+  .map(serializedPairSegmentAndKey -> 
lookupInCache(serializedPairSegmentAndKey, cachedValues));
 }
 
-private Map computePerSegmentCacheKeys(
-Set segments,
+private Stream> 
computePerSegmentCacheKeys(
+Stream segments,
 byte[] queryCacheKey
 )
 {
-  // cacheKeys map must preserve segment ordering, in order for shards to 
always be combined in the same order
-  Map cacheKeys = Maps.newLinkedHashMap();
-  for (ServerToSegment serverToSegment : segments) {
-final Cache.NamedKey segmentCacheKey = 
CacheUtil.computeSegmentCacheKey(
-serverToSegment.getServer().getSegment().getIdentifier(),
-serverToSegment.getSegmentDescriptor(),
-queryCacheKey
-);
-cacheKeys.put(serverToSegment, segmentCacheKey);
-  }
-  return cacheKeys;
+  return segments
+  .map(serverToSegment -> {
+// cacheKeys 

[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213875010
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
+ * @param segments  The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an 
optional that is present
+ * if a cached value was found
+ */
+private Stream>> 
maybeFetchCacheResults(
 final byte[] queryCacheKey,
-final Set segments
+final Stream segments
 )
 {
   if (queryCacheKey == null) {
-return Collections.emptyList();
+return segments.map(s -> Pair.of(s, Optional.empty()));
   }
-  final List> alreadyCachedResults = 
Lists.newArrayList();
-  Map perSegmentCacheKeys = 
computePerSegmentCacheKeys(segments, queryCacheKey);
-  // Pull cached segments from cache and remove from set of segments to 
query
-  final Map cachedValues = 
computeCachedValues(perSegmentCacheKeys);
-
-  perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
-final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
-
-final byte[] cachedValue = cachedValues.get(segmentCacheKey);
-if (cachedValue != null) {
-  // remove cached segment from set of segments to query
-  segments.remove(segment);
-  alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
-} else if (populateCache) {
-  // otherwise, if populating cache, add segment to list of segments 
to cache
-  final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
-  addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
-}
-  });
-  return alreadyCachedResults;
+  // We materialize the stream here in order to have the bulk cache 
fetching work as expected
+  final List> materializedKeyList = 
computePerSegmentCacheKeys(
+  segments,
+  queryCacheKey
+  ).collect(Collectors.toList());
+
+  // Do bulk fetch
+  final Map> cachedValues = 
computeCachedValues(materializedKeyList.stream())
+  .collect(Pair.mapCollector());
+
+  // A limitation of the cache system is that the cached values are 
returned without passing through the original
+  // objects. This hash join is a way to get the ServerToSegment and 
Optional matched up again
+  return materializedKeyList
+  .stream()
+  .map(serializedPairSegmentAndKey -> 
lookupInCache(serializedPairSegmentAndKey, cachedValues));
 }
 
-private Map computePerSegmentCacheKeys(
-Set segments,
+private Stream> 
computePerSegmentCacheKeys(
+Stream segments,
 byte[] queryCacheKey
 )
 {
-  // cacheKeys map must preserve segment ordering, in order for shards to 
always be combined in the same order
-  Map cacheKeys = Maps.newLinkedHashMap();
-  for (ServerToSegment serverToSegment : segments) {
-final Cache.NamedKey segmentCacheKey = 
CacheUtil.computeSegmentCacheKey(
-serverToSegment.getServer().getSegment().getIdentifier(),
-serverToSegment.getSegmentDescriptor(),
-queryCacheKey
-);
-cacheKeys.put(serverToSegment, segmentCacheKey);
-  }
-  return cacheKeys;
+  return segments
+  .map(serverToSegment -> {
+// cacheKeys 

[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213874775
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
+ * @param segments  The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an 
optional that is present
+ * if a cached value was found
+ */
+private Stream>> 
maybeFetchCacheResults(
 final byte[] queryCacheKey,
-final Set segments
+final Stream segments
 )
 {
   if (queryCacheKey == null) {
-return Collections.emptyList();
+return segments.map(s -> Pair.of(s, Optional.empty()));
   }
-  final List> alreadyCachedResults = 
Lists.newArrayList();
-  Map perSegmentCacheKeys = 
computePerSegmentCacheKeys(segments, queryCacheKey);
-  // Pull cached segments from cache and remove from set of segments to 
query
-  final Map cachedValues = 
computeCachedValues(perSegmentCacheKeys);
-
-  perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
-final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
-
-final byte[] cachedValue = cachedValues.get(segmentCacheKey);
-if (cachedValue != null) {
-  // remove cached segment from set of segments to query
-  segments.remove(segment);
-  alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
-} else if (populateCache) {
-  // otherwise, if populating cache, add segment to list of segments 
to cache
-  final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
-  addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
-}
-  });
-  return alreadyCachedResults;
+  // We materialize the stream here in order to have the bulk cache 
fetching work as expected
+  final List> materializedKeyList = 
computePerSegmentCacheKeys(
+  segments,
+  queryCacheKey
+  ).collect(Collectors.toList());
+
+  // Do bulk fetch
+  final Map> cachedValues = 
computeCachedValues(materializedKeyList.stream())
+  .collect(Pair.mapCollector());
+
+  // A limitation of the cache system is that the cached values are 
returned without passing through the original
+  // objects. This hash join is a way to get the ServerToSegment and 
Optional matched up again
+  return materializedKeyList
+  .stream()
+  .map(serializedPairSegmentAndKey -> 
lookupInCache(serializedPairSegmentAndKey, cachedValues));
 }
 
-private Map computePerSegmentCacheKeys(
-Set segments,
+private Stream> 
computePerSegmentCacheKeys(
+Stream segments,
 byte[] queryCacheKey
 )
 {
-  // cacheKeys map must preserve segment ordering, in order for shards to 
always be combined in the same order
-  Map cacheKeys = Maps.newLinkedHashMap();
-  for (ServerToSegment serverToSegment : segments) {
-final Cache.NamedKey segmentCacheKey = 
CacheUtil.computeSegmentCacheKey(
-serverToSegment.getServer().getSegment().getIdentifier(),
-serverToSegment.getSegmentDescriptor(),
-queryCacheKey
-);
-cacheKeys.put(serverToSegment, segmentCacheKey);
-  }
-  return cacheKeys;
+  return segments
+  .map(serverToSegment -> {
+// cacheKeys 

[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213871945
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
+ * @param segments  The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an 
optional that is present
+ * if a cached value was found
+ */
+private Stream>> 
maybeFetchCacheResults(
 final byte[] queryCacheKey,
-final Set segments
+final Stream segments
 )
 {
   if (queryCacheKey == null) {
-return Collections.emptyList();
+return segments.map(s -> Pair.of(s, Optional.empty()));
   }
-  final List> alreadyCachedResults = 
Lists.newArrayList();
-  Map perSegmentCacheKeys = 
computePerSegmentCacheKeys(segments, queryCacheKey);
-  // Pull cached segments from cache and remove from set of segments to 
query
-  final Map cachedValues = 
computeCachedValues(perSegmentCacheKeys);
-
-  perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
-final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
-
-final byte[] cachedValue = cachedValues.get(segmentCacheKey);
-if (cachedValue != null) {
-  // remove cached segment from set of segments to query
-  segments.remove(segment);
-  alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
-} else if (populateCache) {
-  // otherwise, if populating cache, add segment to list of segments 
to cache
-  final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
-  addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
-}
-  });
-  return alreadyCachedResults;
+  // We materialize the stream here in order to have the bulk cache 
fetching work as expected
+  final List> materializedKeyList = 
computePerSegmentCacheKeys(
+  segments,
+  queryCacheKey
+  ).collect(Collectors.toList());
+
+  // Do bulk fetch
+  final Map> cachedValues = 
computeCachedValues(materializedKeyList.stream())
+  .collect(Pair.mapCollector());
+
+  // A limitation of the cache system is that the cached values are 
returned without passing through the original
+  // objects. This hash join is a way to get the ServerToSegment and 
Optional matched up again
+  return materializedKeyList
+  .stream()
+  .map(serializedPairSegmentAndKey -> 
lookupInCache(serializedPairSegmentAndKey, cachedValues));
 }
 
-private Map computePerSegmentCacheKeys(
-Set segments,
+private Stream> 
computePerSegmentCacheKeys(
+Stream segments,
 byte[] queryCacheKey
 )
 {
-  // cacheKeys map must preserve segment ordering, in order for shards to 
always be combined in the same order
-  Map cacheKeys = Maps.newLinkedHashMap();
-  for (ServerToSegment serverToSegment : segments) {
-final Cache.NamedKey segmentCacheKey = 
CacheUtil.computeSegmentCacheKey(
-serverToSegment.getServer().getSegment().getIdentifier(),
-serverToSegment.getSegmentDescriptor(),
-queryCacheKey
-);
-cacheKeys.put(serverToSegment, segmentCacheKey);
-  }
-  return cacheKeys;
+  return segments
+  .map(serverToSegment -> {
+// cacheKeys 

[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213875621
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
+ * @param segments  The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an 
optional that is present
+ * if a cached value was found
+ */
+private Stream>> 
maybeFetchCacheResults(
 final byte[] queryCacheKey,
-final Set segments
+final Stream segments
 )
 {
   if (queryCacheKey == null) {
-return Collections.emptyList();
+return segments.map(s -> Pair.of(s, Optional.empty()));
   }
-  final List> alreadyCachedResults = 
Lists.newArrayList();
-  Map perSegmentCacheKeys = 
computePerSegmentCacheKeys(segments, queryCacheKey);
-  // Pull cached segments from cache and remove from set of segments to 
query
-  final Map cachedValues = 
computeCachedValues(perSegmentCacheKeys);
-
-  perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
-final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
-
-final byte[] cachedValue = cachedValues.get(segmentCacheKey);
-if (cachedValue != null) {
-  // remove cached segment from set of segments to query
-  segments.remove(segment);
-  alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
-} else if (populateCache) {
-  // otherwise, if populating cache, add segment to list of segments 
to cache
-  final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
-  addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
-}
-  });
-  return alreadyCachedResults;
+  // We materialize the stream here in order to have the bulk cache 
fetching work as expected
+  final List> materializedKeyList = 
computePerSegmentCacheKeys(
+  segments,
+  queryCacheKey
+  ).collect(Collectors.toList());
+
+  // Do bulk fetch
+  final Map> cachedValues = 
computeCachedValues(materializedKeyList.stream())
+  .collect(Pair.mapCollector());
+
+  // A limitation of the cache system is that the cached values are 
returned without passing through the original
+  // objects. This hash join is a way to get the ServerToSegment and 
Optional matched up again
+  return materializedKeyList
+  .stream()
+  .map(serializedPairSegmentAndKey -> 
lookupInCache(serializedPairSegmentAndKey, cachedValues));
 }
 
-private Map computePerSegmentCacheKeys(
-Set segments,
+private Stream> 
computePerSegmentCacheKeys(
+Stream segments,
 byte[] queryCacheKey
 )
 {
-  // cacheKeys map must preserve segment ordering, in order for shards to 
always be combined in the same order
-  Map cacheKeys = Maps.newLinkedHashMap();
-  for (ServerToSegment serverToSegment : segments) {
-final Cache.NamedKey segmentCacheKey = 
CacheUtil.computeSegmentCacheKey(
-serverToSegment.getServer().getSegment().getIdentifier(),
-serverToSegment.getSegmentDescriptor(),
-queryCacheKey
-);
-cacheKeys.put(serverToSegment, segmentCacheKey);
-  }
-  return cacheKeys;
+  return segments
+  .map(serverToSegment -> {
+// cacheKeys 

[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213862030
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -139,21 +163,19 @@ public CachingClusteredClient(
   @Override
   public  QueryRunner getQueryRunnerForIntervals(final Query query, 
final Iterable intervals)
   {
-return new QueryRunner()
-{
-  @Override
-  public Sequence run(final QueryPlus queryPlus, final Map responseContext)
-  {
-return CachingClusteredClient.this.run(queryPlus, responseContext, 
timeline -> timeline);
-  }
-};
+return runAndMergeWithTimelineChange(
+query,
+// No change, but Function.identity() doesn't work here for some reason
+identity -> identity
+);
   }
 
   /**
* Run a query. The timelineConverter will be given the "master" timeline 
and can be used to return a different
* timeline, if desired. This is used by getQueryRunnerForSegments.
*/
-  private  Sequence run(
+  @VisibleForTesting
+   Stream> run(
 
 Review comment:
   According to the call order, `run()` should be placed after 
`runAndMergeWithTimelineChange()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213871349
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
+ * @param segments  The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an 
optional that is present
+ * if a cached value was found
+ */
+private Stream>> 
maybeFetchCacheResults(
 final byte[] queryCacheKey,
-final Set segments
+final Stream segments
 )
 {
   if (queryCacheKey == null) {
-return Collections.emptyList();
+return segments.map(s -> Pair.of(s, Optional.empty()));
   }
-  final List> alreadyCachedResults = 
Lists.newArrayList();
-  Map perSegmentCacheKeys = 
computePerSegmentCacheKeys(segments, queryCacheKey);
-  // Pull cached segments from cache and remove from set of segments to 
query
-  final Map cachedValues = 
computeCachedValues(perSegmentCacheKeys);
-
-  perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
-final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
-
-final byte[] cachedValue = cachedValues.get(segmentCacheKey);
-if (cachedValue != null) {
-  // remove cached segment from set of segments to query
-  segments.remove(segment);
-  alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
-} else if (populateCache) {
-  // otherwise, if populating cache, add segment to list of segments 
to cache
-  final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
-  addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
-}
-  });
-  return alreadyCachedResults;
+  // We materialize the stream here in order to have the bulk cache 
fetching work as expected
+  final List> materializedKeyList = 
computePerSegmentCacheKeys(
+  segments,
+  queryCacheKey
+  ).collect(Collectors.toList());
+
+  // Do bulk fetch
+  final Map> cachedValues = 
computeCachedValues(materializedKeyList.stream())
+  .collect(Pair.mapCollector());
+
+  // A limitation of the cache system is that the cached values are 
returned without passing through the original
+  // objects. This hash join is a way to get the ServerToSegment and 
Optional matched up again
+  return materializedKeyList
+  .stream()
+  .map(serializedPairSegmentAndKey -> 
lookupInCache(serializedPairSegmentAndKey, cachedValues));
 }
 
-private Map computePerSegmentCacheKeys(
-Set segments,
+private Stream> 
computePerSegmentCacheKeys(
 
 Review comment:
   This method is used in just one place and the stream is materialized there 
immediately. Could you change this method to return List directly?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213861516
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -139,21 +163,19 @@ public CachingClusteredClient(
   @Override
   public  QueryRunner getQueryRunnerForIntervals(final Query query, 
final Iterable intervals)
   {
-return new QueryRunner()
-{
-  @Override
-  public Sequence run(final QueryPlus queryPlus, final Map responseContext)
-  {
-return CachingClusteredClient.this.run(queryPlus, responseContext, 
timeline -> timeline);
-  }
-};
+return runAndMergeWithTimelineChange(
 
 Review comment:
   Unnecessary breakdown


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213863127
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -162,34 +184,68 @@ public CachingClusteredClient(
 return new SpecificQueryRunnable<>(queryPlus, 
responseContext).run(timelineConverter);
   }
 
-  @Override
-  public  QueryRunner getQueryRunnerForSegments(final Query query, 
final Iterable specs)
+  private  QueryRunner runAndMergeWithTimelineChange(
+  final Query query,
+  final UnaryOperator> 
timelineConverter
+  )
   {
-return new QueryRunner()
-{
-  @Override
-  public Sequence run(final QueryPlus queryPlus, final Map responseContext)
-  {
-return CachingClusteredClient.this.run(
+final OptionalLong mergeBatch = 
QueryContexts.getIntermediateMergeBatchThreshold(query);
+
+if (mergeBatch.isPresent()) {
+  final QueryRunnerFactory> queryRunnerFactory = 
conglomerate.findFactory(query);
+  final QueryToolChest> toolChest = 
queryRunnerFactory.getToolchest();
+  return (queryPlus, responseContext) -> {
+final Stream> sequences = run(
 queryPlus,
 responseContext,
-timeline -> {
-  final VersionedIntervalTimeline 
timeline2 =
-  new VersionedIntervalTimeline<>(Ordering.natural());
-  for (SegmentDescriptor spec : specs) {
-final PartitionHolder entry = 
timeline.findEntry(spec.getInterval(), spec.getVersion());
-if (entry != null) {
-  final PartitionChunk chunk = 
entry.getChunk(spec.getPartitionNumber());
-  if (chunk != null) {
-timeline2.add(spec.getInterval(), spec.getVersion(), 
chunk);
-  }
-}
-  }
-  return timeline2;
-}
+timelineConverter
+);
+return MergeWorkTask.parallelMerge(
+sequences.parallel(),
+sequenceStream ->
 
 Review comment:
   Please add type to this variable for readability


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213875276
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
+ * @param segments  The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an 
optional that is present
+ * if a cached value was found
+ */
+private Stream>> 
maybeFetchCacheResults(
 final byte[] queryCacheKey,
-final Set segments
+final Stream segments
 )
 {
   if (queryCacheKey == null) {
-return Collections.emptyList();
+return segments.map(s -> Pair.of(s, Optional.empty()));
   }
-  final List> alreadyCachedResults = 
Lists.newArrayList();
-  Map perSegmentCacheKeys = 
computePerSegmentCacheKeys(segments, queryCacheKey);
-  // Pull cached segments from cache and remove from set of segments to 
query
-  final Map cachedValues = 
computeCachedValues(perSegmentCacheKeys);
-
-  perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
-final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
-
-final byte[] cachedValue = cachedValues.get(segmentCacheKey);
-if (cachedValue != null) {
-  // remove cached segment from set of segments to query
-  segments.remove(segment);
-  alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
-} else if (populateCache) {
-  // otherwise, if populating cache, add segment to list of segments 
to cache
-  final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
-  addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
-}
-  });
-  return alreadyCachedResults;
+  // We materialize the stream here in order to have the bulk cache 
fetching work as expected
+  final List> materializedKeyList = 
computePerSegmentCacheKeys(
+  segments,
+  queryCacheKey
+  ).collect(Collectors.toList());
+
+  // Do bulk fetch
+  final Map> cachedValues = 
computeCachedValues(materializedKeyList.stream())
+  .collect(Pair.mapCollector());
+
+  // A limitation of the cache system is that the cached values are 
returned without passing through the original
+  // objects. This hash join is a way to get the ServerToSegment and 
Optional matched up again
+  return materializedKeyList
+  .stream()
+  .map(serializedPairSegmentAndKey -> 
lookupInCache(serializedPairSegmentAndKey, cachedValues));
 }
 
-private Map computePerSegmentCacheKeys(
-Set segments,
+private Stream> 
computePerSegmentCacheKeys(
+Stream segments,
 byte[] queryCacheKey
 )
 {
-  // cacheKeys map must preserve segment ordering, in order for shards to 
always be combined in the same order
-  Map cacheKeys = Maps.newLinkedHashMap();
-  for (ServerToSegment serverToSegment : segments) {
-final Cache.NamedKey segmentCacheKey = 
CacheUtil.computeSegmentCacheKey(
-serverToSegment.getServer().getSegment().getIdentifier(),
-serverToSegment.getSegmentDescriptor(),
-queryCacheKey
-);
-cacheKeys.put(serverToSegment, segmentCacheKey);
-  }
-  return cacheKeys;
+  return segments
+  .map(serverToSegment -> {
+// cacheKeys 

[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213863415
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -162,34 +184,68 @@ public CachingClusteredClient(
 return new SpecificQueryRunnable<>(queryPlus, 
responseContext).run(timelineConverter);
   }
 
-  @Override
-  public  QueryRunner getQueryRunnerForSegments(final Query query, 
final Iterable specs)
+  private  QueryRunner runAndMergeWithTimelineChange(
+  final Query query,
+  final UnaryOperator> 
timelineConverter
+  )
   {
-return new QueryRunner()
-{
-  @Override
-  public Sequence run(final QueryPlus queryPlus, final Map responseContext)
-  {
-return CachingClusteredClient.this.run(
+final OptionalLong mergeBatch = 
QueryContexts.getIntermediateMergeBatchThreshold(query);
+
+if (mergeBatch.isPresent()) {
+  final QueryRunnerFactory> queryRunnerFactory = 
conglomerate.findFactory(query);
+  final QueryToolChest> toolChest = 
queryRunnerFactory.getToolchest();
+  return (queryPlus, responseContext) -> {
+final Stream> sequences = run(
 queryPlus,
 responseContext,
-timeline -> {
-  final VersionedIntervalTimeline 
timeline2 =
-  new VersionedIntervalTimeline<>(Ordering.natural());
-  for (SegmentDescriptor spec : specs) {
-final PartitionHolder entry = 
timeline.findEntry(spec.getInterval(), spec.getVersion());
-if (entry != null) {
-  final PartitionChunk chunk = 
entry.getChunk(spec.getPartitionNumber());
-  if (chunk != null) {
-timeline2.add(spec.getInterval(), spec.getVersion(), 
chunk);
-  }
-}
-  }
-  return timeline2;
-}
+timelineConverter
+);
+return MergeWorkTask.parallelMerge(
+sequences.parallel(),
+sequenceStream ->
+new FluentQueryRunnerBuilder<>(toolChest)
+.create(
+queryRunnerFactory.mergeRunners(
+mergeFjp,
+sequenceStream.map(
+s -> (QueryRunner) (ignored0, ignored1) -> 
(Sequence) s
 
 Review comment:
   Extracting a static factory method `QueryRunner.returnConstant()` would be 
more readable


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213871698
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
+ * @param segments  The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an 
optional that is present
+ * if a cached value was found
+ */
+private Stream>> 
maybeFetchCacheResults(
 final byte[] queryCacheKey,
-final Set segments
+final Stream segments
 )
 {
   if (queryCacheKey == null) {
-return Collections.emptyList();
+return segments.map(s -> Pair.of(s, Optional.empty()));
   }
-  final List> alreadyCachedResults = 
Lists.newArrayList();
-  Map perSegmentCacheKeys = 
computePerSegmentCacheKeys(segments, queryCacheKey);
-  // Pull cached segments from cache and remove from set of segments to 
query
-  final Map cachedValues = 
computeCachedValues(perSegmentCacheKeys);
-
-  perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
-final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
-
-final byte[] cachedValue = cachedValues.get(segmentCacheKey);
-if (cachedValue != null) {
-  // remove cached segment from set of segments to query
-  segments.remove(segment);
-  alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
-} else if (populateCache) {
-  // otherwise, if populating cache, add segment to list of segments 
to cache
-  final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
-  addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
-}
-  });
-  return alreadyCachedResults;
+  // We materialize the stream here in order to have the bulk cache 
fetching work as expected
+  final List> materializedKeyList = 
computePerSegmentCacheKeys(
+  segments,
+  queryCacheKey
+  ).collect(Collectors.toList());
+
+  // Do bulk fetch
+  final Map> cachedValues = 
computeCachedValues(materializedKeyList.stream())
+  .collect(Pair.mapCollector());
+
+  // A limitation of the cache system is that the cached values are 
returned without passing through the original
+  // objects. This hash join is a way to get the ServerToSegment and 
Optional matched up again
+  return materializedKeyList
+  .stream()
+  .map(serializedPairSegmentAndKey -> 
lookupInCache(serializedPairSegmentAndKey, cachedValues));
 }
 
-private Map computePerSegmentCacheKeys(
-Set segments,
+private Stream> 
computePerSegmentCacheKeys(
+Stream segments,
 byte[] queryCacheKey
 )
 {
-  // cacheKeys map must preserve segment ordering, in order for shards to 
always be combined in the same order
-  Map cacheKeys = Maps.newLinkedHashMap();
-  for (ServerToSegment serverToSegment : segments) {
-final Cache.NamedKey segmentCacheKey = 
CacheUtil.computeSegmentCacheKey(
-serverToSegment.getServer().getSegment().getIdentifier(),
-serverToSegment.getSegmentDescriptor(),
-queryCacheKey
-);
-cacheKeys.put(serverToSegment, segmentCacheKey);
-  }
-  return cacheKeys;
+  return segments
+  .map(serverToSegment -> {
+// cacheKeys 

[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213866070
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -242,74 +298,90 @@ public CachingClusteredClient(
 contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
 contextBuilder.put("bySegment", true);
   }
-  return contextBuilder.build();
+  return Collections.unmodifiableMap(contextBuilder);
 }
 
-Sequence run(final UnaryOperator> timelineConverter)
+Stream> run(final UnaryOperator> timelineConverter)
 {
   @Nullable
   TimelineLookup timeline = 
serverView.getTimeline(query.getDataSource());
   if (timeline == null) {
-return Sequences.empty();
+return Stream.empty();
   }
   timeline = timelineConverter.apply(timeline);
   if (uncoveredIntervalsLimit > 0) {
 computeUncoveredIntervals(timeline);
   }
 
-  final Set segments = computeSegmentsToQuery(timeline);
+  Stream segments = computeSegmentsToQuery(timeline);
   @Nullable
   final byte[] queryCacheKey = computeQueryCacheKey();
   if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
+// Materialize for computeCurrentEtag, then re-stream
+final List materializedSegments = 
segments.collect(Collectors.toList());
+segments = materializedSegments.stream();
+
 @Nullable
 final String prevEtag = (String) 
query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH);
 @Nullable
-final String currentEtag = computeCurrentEtag(segments, queryCacheKey);
+final String currentEtag = computeCurrentEtag(materializedSegments, 
queryCacheKey);
 if (currentEtag != null && currentEtag.equals(prevEtag)) {
-  return Sequences.empty();
+  return Stream.empty();
 }
   }
 
-  final List> alreadyCachedResults = 
pruneSegmentsWithCachedResults(queryCacheKey, segments);
-  final SortedMap> segmentsByServer = 
groupSegmentsByServer(segments);
-  return new LazySequence<>(() -> {
-List> sequencesByInterval = new 
ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size());
-addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
-addSequencesFromServer(sequencesByInterval, segmentsByServer);
-return Sequences
-.simple(sequencesByInterval)
-.flatMerge(seq -> seq, query.getResultOrdering());
-  });
+  // This pipeline follows a few general steps:
+  // 1. Fetch cache results - Unfortunately this is an eager operation so 
that the non cached items can
+  // be batched per server. Cached results are assigned to a mock server 
ALREADY_CACHED_SERVER
+  // 2. Group the segment information by server
+  // 3. Per server (including the ALREADY_CACHED_SERVER) create the 
appropriate Sequence results - cached results
+  // are handled in their own merge
+  final Stream>> cacheResolvedResults = 
deserializeFromCache(
+  maybeFetchCacheResults(queryCacheKey, segments)
+  );
+  return groupCachedResultsByServer(cacheResolvedResults)
+  .map(this::runOnServer)
+  // We do a hard materialization here so that the resulting 
spliterators have properties that we want
+  // Otherwise the stream's spliterator is of a hash map entry 
spliterator from the group-by-server operation
+  // This also causes eager initialization of the **sequences**, aka 
forking off the direct druid client requests
+  // Sequence result accumulation should still be lazy
+  .collect(Collectors.toList())
+  .stream();
 }
 
-private Set computeSegmentsToQuery(TimelineLookup timeline)
+/**
+ * Create a stream of the partition chunks which are useful in this query
+ *
+ * @param holder The holder of the shard to server component of the 
timeline
+ *
+ * @return Chunks and the segment descriptors corresponding to the chunk
+ */
+private Stream 
extractServerAndSegment(TimelineObjectHolder holder)
 
 Review comment:
   extractServerAndSegment() should go after computeSegmentsToQuery(). Please 
arrange all methods in this class in the call order!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213867815
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -242,74 +298,90 @@ public CachingClusteredClient(
 contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
 contextBuilder.put("bySegment", true);
   }
-  return contextBuilder.build();
+  return Collections.unmodifiableMap(contextBuilder);
 }
 
-Sequence run(final UnaryOperator> timelineConverter)
+Stream> run(final UnaryOperator> timelineConverter)
 {
   @Nullable
   TimelineLookup timeline = 
serverView.getTimeline(query.getDataSource());
   if (timeline == null) {
-return Sequences.empty();
+return Stream.empty();
   }
   timeline = timelineConverter.apply(timeline);
   if (uncoveredIntervalsLimit > 0) {
 computeUncoveredIntervals(timeline);
   }
 
-  final Set segments = computeSegmentsToQuery(timeline);
+  Stream segments = computeSegmentsToQuery(timeline);
   @Nullable
   final byte[] queryCacheKey = computeQueryCacheKey();
   if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
+// Materialize for computeCurrentEtag, then re-stream
+final List materializedSegments = 
segments.collect(Collectors.toList());
+segments = materializedSegments.stream();
+
 @Nullable
 final String prevEtag = (String) 
query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH);
 @Nullable
-final String currentEtag = computeCurrentEtag(segments, queryCacheKey);
+final String currentEtag = computeCurrentEtag(materializedSegments, 
queryCacheKey);
 if (currentEtag != null && currentEtag.equals(prevEtag)) {
-  return Sequences.empty();
+  return Stream.empty();
 }
   }
 
-  final List> alreadyCachedResults = 
pruneSegmentsWithCachedResults(queryCacheKey, segments);
-  final SortedMap> segmentsByServer = 
groupSegmentsByServer(segments);
-  return new LazySequence<>(() -> {
-List> sequencesByInterval = new 
ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size());
-addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
-addSequencesFromServer(sequencesByInterval, segmentsByServer);
-return Sequences
-.simple(sequencesByInterval)
-.flatMerge(seq -> seq, query.getResultOrdering());
-  });
+  // This pipeline follows a few general steps:
+  // 1. Fetch cache results - Unfortunately this is an eager operation so 
that the non cached items can
+  // be batched per server. Cached results are assigned to a mock server 
ALREADY_CACHED_SERVER
+  // 2. Group the segment information by server
+  // 3. Per server (including the ALREADY_CACHED_SERVER) create the 
appropriate Sequence results - cached results
+  // are handled in their own merge
+  final Stream>> cacheResolvedResults = 
deserializeFromCache(
+  maybeFetchCacheResults(queryCacheKey, segments)
+  );
+  return groupCachedResultsByServer(cacheResolvedResults)
+  .map(this::runOnServer)
+  // We do a hard materialization here so that the resulting 
spliterators have properties that we want
+  // Otherwise the stream's spliterator is of a hash map entry 
spliterator from the group-by-server operation
+  // This also causes eager initialization of the **sequences**, aka 
forking off the direct druid client requests
+  // Sequence result accumulation should still be lazy
+  .collect(Collectors.toList())
+  .stream();
 }
 
-private Set computeSegmentsToQuery(TimelineLookup timeline)
+/**
+ * Create a stream of the partition chunks which are useful in this query
+ *
+ * @param holder The holder of the shard to server component of the 
timeline
+ *
+ * @return Chunks and the segment descriptors corresponding to the chunk
+ */
+private Stream 
extractServerAndSegment(TimelineObjectHolder holder)
 
 Review comment:
   Maybe "extractRelevantServersAndSegments" would be a clearer name


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213873737
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
+ * @param segments  The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an 
optional that is present
+ * if a cached value was found
+ */
+private Stream>> 
maybeFetchCacheResults(
 final byte[] queryCacheKey,
-final Set segments
+final Stream segments
 )
 {
   if (queryCacheKey == null) {
-return Collections.emptyList();
+return segments.map(s -> Pair.of(s, Optional.empty()));
   }
-  final List> alreadyCachedResults = 
Lists.newArrayList();
-  Map perSegmentCacheKeys = 
computePerSegmentCacheKeys(segments, queryCacheKey);
-  // Pull cached segments from cache and remove from set of segments to 
query
-  final Map cachedValues = 
computeCachedValues(perSegmentCacheKeys);
-
-  perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
-final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
-
-final byte[] cachedValue = cachedValues.get(segmentCacheKey);
-if (cachedValue != null) {
-  // remove cached segment from set of segments to query
-  segments.remove(segment);
-  alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
-} else if (populateCache) {
-  // otherwise, if populating cache, add segment to list of segments 
to cache
-  final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
-  addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
-}
-  });
-  return alreadyCachedResults;
+  // We materialize the stream here in order to have the bulk cache 
fetching work as expected
+  final List> materializedKeyList = 
computePerSegmentCacheKeys(
+  segments,
+  queryCacheKey
+  ).collect(Collectors.toList());
+
+  // Do bulk fetch
+  final Map> cachedValues = 
computeCachedValues(materializedKeyList.stream())
+  .collect(Pair.mapCollector());
+
+  // A limitation of the cache system is that the cached values are 
returned without passing through the original
+  // objects. This hash join is a way to get the ServerToSegment and 
Optional matched up again
+  return materializedKeyList
+  .stream()
+  .map(serializedPairSegmentAndKey -> 
lookupInCache(serializedPairSegmentAndKey, cachedValues));
 }
 
-private Map computePerSegmentCacheKeys(
-Set segments,
+private Stream> 
computePerSegmentCacheKeys(
+Stream segments,
 byte[] queryCacheKey
 )
 {
-  // cacheKeys map must preserve segment ordering, in order for shards to 
always be combined in the same order
-  Map cacheKeys = Maps.newLinkedHashMap();
-  for (ServerToSegment serverToSegment : segments) {
-final Cache.NamedKey segmentCacheKey = 
CacheUtil.computeSegmentCacheKey(
-serverToSegment.getServer().getSegment().getIdentifier(),
-serverToSegment.getSegmentDescriptor(),
-queryCacheKey
-);
-cacheKeys.put(serverToSegment, segmentCacheKey);
-  }
-  return cacheKeys;
+  return segments
+  .map(serverToSegment -> {
+// cacheKeys 

[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213865878
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -242,74 +298,90 @@ public CachingClusteredClient(
 contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
 contextBuilder.put("bySegment", true);
   }
-  return contextBuilder.build();
+  return Collections.unmodifiableMap(contextBuilder);
 }
 
-Sequence run(final UnaryOperator> timelineConverter)
+Stream> run(final UnaryOperator> timelineConverter)
 {
   @Nullable
   TimelineLookup timeline = 
serverView.getTimeline(query.getDataSource());
   if (timeline == null) {
-return Sequences.empty();
+return Stream.empty();
   }
   timeline = timelineConverter.apply(timeline);
   if (uncoveredIntervalsLimit > 0) {
 computeUncoveredIntervals(timeline);
   }
 
-  final Set segments = computeSegmentsToQuery(timeline);
+  Stream segments = computeSegmentsToQuery(timeline);
   @Nullable
   final byte[] queryCacheKey = computeQueryCacheKey();
   if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
+// Materialize for computeCurrentEtag, then re-stream
+final List materializedSegments = 
segments.collect(Collectors.toList());
+segments = materializedSegments.stream();
+
 @Nullable
 final String prevEtag = (String) 
query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH);
 @Nullable
-final String currentEtag = computeCurrentEtag(segments, queryCacheKey);
+final String currentEtag = computeCurrentEtag(materializedSegments, 
queryCacheKey);
 if (currentEtag != null && currentEtag.equals(prevEtag)) {
-  return Sequences.empty();
+  return Stream.empty();
 }
   }
 
-  final List> alreadyCachedResults = 
pruneSegmentsWithCachedResults(queryCacheKey, segments);
-  final SortedMap> segmentsByServer = 
groupSegmentsByServer(segments);
-  return new LazySequence<>(() -> {
-List> sequencesByInterval = new 
ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size());
-addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
-addSequencesFromServer(sequencesByInterval, segmentsByServer);
-return Sequences
-.simple(sequencesByInterval)
-.flatMerge(seq -> seq, query.getResultOrdering());
-  });
+  // This pipeline follows a few general steps:
+  // 1. Fetch cache results - Unfortunately this is an eager operation so 
that the non cached items can
+  // be batched per server. Cached results are assigned to a mock server 
ALREADY_CACHED_SERVER
+  // 2. Group the segment information by server
+  // 3. Per server (including the ALREADY_CACHED_SERVER) create the 
appropriate Sequence results - cached results
+  // are handled in their own merge
+  final Stream>> cacheResolvedResults = 
deserializeFromCache(
+  maybeFetchCacheResults(queryCacheKey, segments)
+  );
+  return groupCachedResultsByServer(cacheResolvedResults)
+  .map(this::runOnServer)
+  // We do a hard materialization here so that the resulting 
spliterators have properties that we want
+  // Otherwise the stream's spliterator is of a hash map entry 
spliterator from the group-by-server operation
+  // This also causes eager initialization of the **sequences**, aka 
forking off the direct druid client requests
+  // Sequence result accumulation should still be lazy
+  .collect(Collectors.toList())
 
 Review comment:
   So could you make the run() method to return a Collection or List instead of 
a Stream?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213861485
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -139,21 +163,19 @@ public CachingClusteredClient(
   @Override
   public  QueryRunner getQueryRunnerForIntervals(final Query query, 
final Iterable intervals)
   {
-return new QueryRunner()
-{
-  @Override
-  public Sequence run(final QueryPlus queryPlus, final Map responseContext)
-  {
-return CachingClusteredClient.this.run(queryPlus, responseContext, 
timeline -> timeline);
-  }
-};
+return runAndMergeWithTimelineChange(
+query,
+// No change, but Function.identity() doesn't work here for some reason
+identity -> identity
 
 Review comment:
   `UnaryOperator.identity()` works


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213866741
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -242,74 +298,90 @@ public CachingClusteredClient(
 contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
 contextBuilder.put("bySegment", true);
   }
-  return contextBuilder.build();
+  return Collections.unmodifiableMap(contextBuilder);
 }
 
-Sequence run(final UnaryOperator> timelineConverter)
+Stream> run(final UnaryOperator> timelineConverter)
 {
   @Nullable
   TimelineLookup timeline = 
serverView.getTimeline(query.getDataSource());
   if (timeline == null) {
-return Sequences.empty();
+return Stream.empty();
   }
   timeline = timelineConverter.apply(timeline);
   if (uncoveredIntervalsLimit > 0) {
 computeUncoveredIntervals(timeline);
   }
 
-  final Set segments = computeSegmentsToQuery(timeline);
+  Stream segments = computeSegmentsToQuery(timeline);
   @Nullable
   final byte[] queryCacheKey = computeQueryCacheKey();
   if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
+// Materialize for computeCurrentEtag, then re-stream
+final List materializedSegments = 
segments.collect(Collectors.toList());
+segments = materializedSegments.stream();
+
 @Nullable
 final String prevEtag = (String) 
query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH);
 @Nullable
-final String currentEtag = computeCurrentEtag(segments, queryCacheKey);
+final String currentEtag = computeCurrentEtag(materializedSegments, 
queryCacheKey);
 if (currentEtag != null && currentEtag.equals(prevEtag)) {
-  return Sequences.empty();
+  return Stream.empty();
 }
   }
 
-  final List> alreadyCachedResults = 
pruneSegmentsWithCachedResults(queryCacheKey, segments);
-  final SortedMap> segmentsByServer = 
groupSegmentsByServer(segments);
-  return new LazySequence<>(() -> {
-List> sequencesByInterval = new 
ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size());
-addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
-addSequencesFromServer(sequencesByInterval, segmentsByServer);
-return Sequences
-.simple(sequencesByInterval)
-.flatMerge(seq -> seq, query.getResultOrdering());
-  });
+  // This pipeline follows a few general steps:
+  // 1. Fetch cache results - Unfortunately this is an eager operation so 
that the non cached items can
+  // be batched per server. Cached results are assigned to a mock server 
ALREADY_CACHED_SERVER
+  // 2. Group the segment information by server
+  // 3. Per server (including the ALREADY_CACHED_SERVER) create the 
appropriate Sequence results - cached results
+  // are handled in their own merge
+  final Stream>> cacheResolvedResults = 
deserializeFromCache(
+  maybeFetchCacheResults(queryCacheKey, segments)
+  );
+  return groupCachedResultsByServer(cacheResolvedResults)
+  .map(this::runOnServer)
+  // We do a hard materialization here so that the resulting 
spliterators have properties that we want
+  // Otherwise the stream's spliterator is of a hash map entry 
spliterator from the group-by-server operation
+  // This also causes eager initialization of the **sequences**, aka 
forking off the direct druid client requests
+  // Sequence result accumulation should still be lazy
+  .collect(Collectors.toList())
+  .stream();
 }
 
-private Set computeSegmentsToQuery(TimelineLookup timeline)
+/**
+ * Create a stream of the partition chunks which are useful in this query
+ *
+ * @param holder The holder of the shard to server component of the 
timeline
+ *
+ * @return Chunks and the segment descriptors corresponding to the chunk
+ */
+private Stream 
extractServerAndSegment(TimelineObjectHolder holder)
 {
-  final List> serversLookup = 
toolChest.filterSegments(
-  query,
-  query.getIntervals().stream().flatMap(i -> 
timeline.lookup(i).stream()).collect(Collectors.toList())
-  );
+  return DimFilterUtils
+  .filterShards(
+  query.getFilter(),
+  holder.getObject(),
+  partitionChunk -> 
partitionChunk.getObject().getSegment().getShardSpec(),
+  Maps.newHashMap()
+  )
+  .stream()
+  .map(chunk -> new ServerToSegment(
+  chunk.getObject(),
+  new SegmentDescriptor(holder.getInterval(), holder.getVersion(), 
chunk.getChunkNumber())
+  ));
+}
 
-  final Set 

[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213872779
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
+ * @param segments  The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an 
optional that is present
+ * if a cached value was found
+ */
+private Stream>> 
maybeFetchCacheResults(
 final byte[] queryCacheKey,
-final Set segments
+final Stream segments
 )
 {
   if (queryCacheKey == null) {
-return Collections.emptyList();
+return segments.map(s -> Pair.of(s, Optional.empty()));
   }
-  final List> alreadyCachedResults = 
Lists.newArrayList();
-  Map perSegmentCacheKeys = 
computePerSegmentCacheKeys(segments, queryCacheKey);
-  // Pull cached segments from cache and remove from set of segments to 
query
-  final Map cachedValues = 
computeCachedValues(perSegmentCacheKeys);
-
-  perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
-final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
-
-final byte[] cachedValue = cachedValues.get(segmentCacheKey);
-if (cachedValue != null) {
-  // remove cached segment from set of segments to query
-  segments.remove(segment);
-  alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
-} else if (populateCache) {
-  // otherwise, if populating cache, add segment to list of segments 
to cache
-  final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
-  addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
-}
-  });
-  return alreadyCachedResults;
+  // We materialize the stream here in order to have the bulk cache 
fetching work as expected
+  final List> materializedKeyList = 
computePerSegmentCacheKeys(
+  segments,
+  queryCacheKey
+  ).collect(Collectors.toList());
+
+  // Do bulk fetch
+  final Map> cachedValues = 
computeCachedValues(materializedKeyList.stream())
+  .collect(Pair.mapCollector());
+
+  // A limitation of the cache system is that the cached values are 
returned without passing through the original
+  // objects. This hash join is a way to get the ServerToSegment and 
Optional matched up again
+  return materializedKeyList
+  .stream()
+  .map(serializedPairSegmentAndKey -> 
lookupInCache(serializedPairSegmentAndKey, cachedValues));
 }
 
-private Map computePerSegmentCacheKeys(
-Set segments,
+private Stream> 
computePerSegmentCacheKeys(
+Stream segments,
 byte[] queryCacheKey
 )
 {
-  // cacheKeys map must preserve segment ordering, in order for shards to 
always be combined in the same order
-  Map cacheKeys = Maps.newLinkedHashMap();
-  for (ServerToSegment serverToSegment : segments) {
-final Cache.NamedKey segmentCacheKey = 
CacheUtil.computeSegmentCacheKey(
-serverToSegment.getServer().getSegment().getIdentifier(),
-serverToSegment.getSegmentDescriptor(),
-queryCacheKey
-);
-cacheKeys.put(serverToSegment, segmentCacheKey);
-  }
-  return cacheKeys;
+  return segments
+  .map(serverToSegment -> {
+// cacheKeys 

[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213866914
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -242,74 +298,90 @@ public CachingClusteredClient(
 contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
 contextBuilder.put("bySegment", true);
   }
-  return contextBuilder.build();
+  return Collections.unmodifiableMap(contextBuilder);
 }
 
-Sequence run(final UnaryOperator> timelineConverter)
+Stream> run(final UnaryOperator> timelineConverter)
 {
   @Nullable
   TimelineLookup timeline = 
serverView.getTimeline(query.getDataSource());
   if (timeline == null) {
-return Sequences.empty();
+return Stream.empty();
   }
   timeline = timelineConverter.apply(timeline);
   if (uncoveredIntervalsLimit > 0) {
 computeUncoveredIntervals(timeline);
   }
 
-  final Set segments = computeSegmentsToQuery(timeline);
+  Stream segments = computeSegmentsToQuery(timeline);
   @Nullable
   final byte[] queryCacheKey = computeQueryCacheKey();
   if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
+// Materialize for computeCurrentEtag, then re-stream
+final List materializedSegments = 
segments.collect(Collectors.toList());
+segments = materializedSegments.stream();
+
 @Nullable
 final String prevEtag = (String) 
query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH);
 @Nullable
-final String currentEtag = computeCurrentEtag(segments, queryCacheKey);
+final String currentEtag = computeCurrentEtag(materializedSegments, 
queryCacheKey);
 if (currentEtag != null && currentEtag.equals(prevEtag)) {
-  return Sequences.empty();
+  return Stream.empty();
 }
   }
 
-  final List> alreadyCachedResults = 
pruneSegmentsWithCachedResults(queryCacheKey, segments);
-  final SortedMap> segmentsByServer = 
groupSegmentsByServer(segments);
-  return new LazySequence<>(() -> {
-List> sequencesByInterval = new 
ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size());
-addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
-addSequencesFromServer(sequencesByInterval, segmentsByServer);
-return Sequences
-.simple(sequencesByInterval)
-.flatMerge(seq -> seq, query.getResultOrdering());
-  });
+  // This pipeline follows a few general steps:
+  // 1. Fetch cache results - Unfortunately this is an eager operation so 
that the non cached items can
+  // be batched per server. Cached results are assigned to a mock server 
ALREADY_CACHED_SERVER
+  // 2. Group the segment information by server
+  // 3. Per server (including the ALREADY_CACHED_SERVER) create the 
appropriate Sequence results - cached results
+  // are handled in their own merge
+  final Stream>> cacheResolvedResults = 
deserializeFromCache(
+  maybeFetchCacheResults(queryCacheKey, segments)
+  );
+  return groupCachedResultsByServer(cacheResolvedResults)
+  .map(this::runOnServer)
+  // We do a hard materialization here so that the resulting 
spliterators have properties that we want
+  // Otherwise the stream's spliterator is of a hash map entry 
spliterator from the group-by-server operation
+  // This also causes eager initialization of the **sequences**, aka 
forking off the direct druid client requests
+  // Sequence result accumulation should still be lazy
+  .collect(Collectors.toList())
+  .stream();
 }
 
-private Set computeSegmentsToQuery(TimelineLookup timeline)
+/**
+ * Create a stream of the partition chunks which are useful in this query
+ *
+ * @param holder The holder of the shard to server component of the 
timeline
+ *
+ * @return Chunks and the segment descriptors corresponding to the chunk
+ */
+private Stream 
extractServerAndSegment(TimelineObjectHolder holder)
 {
-  final List> serversLookup = 
toolChest.filterSegments(
-  query,
-  query.getIntervals().stream().flatMap(i -> 
timeline.lookup(i).stream()).collect(Collectors.toList())
-  );
+  return DimFilterUtils
+  .filterShards(
+  query.getFilter(),
+  holder.getObject(),
+  partitionChunk -> 
partitionChunk.getObject().getSegment().getShardSpec(),
+  Maps.newHashMap()
+  )
+  .stream()
+  .map(chunk -> new ServerToSegment(
+  chunk.getObject(),
+  new SegmentDescriptor(holder.getInterval(), holder.getVersion(), 
chunk.getChunkNumber())
+  ));
+}
 
-  final Set 

[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213870640
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
 
 Review comment:
   This is an interesting comment, but it requires a reference for further 
reading. Why BulkGet doesn't work with unordered segments?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213863490
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -162,34 +184,68 @@ public CachingClusteredClient(
 return new SpecificQueryRunnable<>(queryPlus, 
responseContext).run(timelineConverter);
   }
 
-  @Override
-  public  QueryRunner getQueryRunnerForSegments(final Query query, 
final Iterable specs)
+  private  QueryRunner runAndMergeWithTimelineChange(
+  final Query query,
+  final UnaryOperator> 
timelineConverter
+  )
   {
-return new QueryRunner()
-{
-  @Override
-  public Sequence run(final QueryPlus queryPlus, final Map responseContext)
-  {
-return CachingClusteredClient.this.run(
+final OptionalLong mergeBatch = 
QueryContexts.getIntermediateMergeBatchThreshold(query);
+
+if (mergeBatch.isPresent()) {
+  final QueryRunnerFactory> queryRunnerFactory = 
conglomerate.findFactory(query);
+  final QueryToolChest> toolChest = 
queryRunnerFactory.getToolchest();
+  return (queryPlus, responseContext) -> {
+final Stream> sequences = run(
 
 Review comment:
   Unnecessary breakdown. There is exactly the same expression at line 225 that 
is not broken down.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213870776
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
 
 Review comment:
   Maybe "The cache key for the query as a whole" is clearer


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213869762
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
 
 Review comment:
   lookupInCache() should go after maybeFetchCacheResults()


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213865474
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -242,74 +298,90 @@ public CachingClusteredClient(
 contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
 contextBuilder.put("bySegment", true);
   }
-  return contextBuilder.build();
+  return Collections.unmodifiableMap(contextBuilder);
 }
 
-Sequence run(final UnaryOperator> timelineConverter)
+Stream> run(final UnaryOperator> timelineConverter)
 
 Review comment:
   Maybe call the method "runByServer" for more clarity. And add a doc comment 
that it returns sequences by server, plus cached results separately.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213870996
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
+ * @param segments  The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an 
optional that is present
+ * if a cached value was found
+ */
+private Stream>> 
maybeFetchCacheResults(
 final byte[] queryCacheKey,
-final Set segments
+final Stream segments
 )
 {
   if (queryCacheKey == null) {
-return Collections.emptyList();
+return segments.map(s -> Pair.of(s, Optional.empty()));
   }
-  final List> alreadyCachedResults = 
Lists.newArrayList();
-  Map perSegmentCacheKeys = 
computePerSegmentCacheKeys(segments, queryCacheKey);
-  // Pull cached segments from cache and remove from set of segments to 
query
-  final Map cachedValues = 
computeCachedValues(perSegmentCacheKeys);
-
-  perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
-final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
-
-final byte[] cachedValue = cachedValues.get(segmentCacheKey);
-if (cachedValue != null) {
-  // remove cached segment from set of segments to query
-  segments.remove(segment);
-  alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
-} else if (populateCache) {
-  // otherwise, if populating cache, add segment to list of segments 
to cache
-  final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
-  addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
-}
-  });
-  return alreadyCachedResults;
+  // We materialize the stream here in order to have the bulk cache 
fetching work as expected
+  final List> materializedKeyList = 
computePerSegmentCacheKeys(
+  segments,
+  queryCacheKey
+  ).collect(Collectors.toList());
+
+  // Do bulk fetch
+  final Map> cachedValues = 
computeCachedValues(materializedKeyList.stream())
+  .collect(Pair.mapCollector());
+
+  // A limitation of the cache system is that the cached values are 
returned without passing through the original
 
 Review comment:
   Could you create an issue about this limitation and reference it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213868302
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -242,74 +298,90 @@ public CachingClusteredClient(
 contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
 contextBuilder.put("bySegment", true);
   }
-  return contextBuilder.build();
+  return Collections.unmodifiableMap(contextBuilder);
 }
 
-Sequence run(final UnaryOperator> timelineConverter)
+Stream> run(final UnaryOperator> timelineConverter)
 {
   @Nullable
   TimelineLookup timeline = 
serverView.getTimeline(query.getDataSource());
   if (timeline == null) {
-return Sequences.empty();
+return Stream.empty();
   }
   timeline = timelineConverter.apply(timeline);
   if (uncoveredIntervalsLimit > 0) {
 computeUncoveredIntervals(timeline);
   }
 
-  final Set segments = computeSegmentsToQuery(timeline);
+  Stream segments = computeSegmentsToQuery(timeline);
   @Nullable
   final byte[] queryCacheKey = computeQueryCacheKey();
   if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
+// Materialize for computeCurrentEtag, then re-stream
+final List materializedSegments = 
segments.collect(Collectors.toList());
+segments = materializedSegments.stream();
+
 @Nullable
 final String prevEtag = (String) 
query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH);
 @Nullable
-final String currentEtag = computeCurrentEtag(segments, queryCacheKey);
+final String currentEtag = computeCurrentEtag(materializedSegments, 
queryCacheKey);
 if (currentEtag != null && currentEtag.equals(prevEtag)) {
-  return Sequences.empty();
+  return Stream.empty();
 }
   }
 
-  final List> alreadyCachedResults = 
pruneSegmentsWithCachedResults(queryCacheKey, segments);
-  final SortedMap> segmentsByServer = 
groupSegmentsByServer(segments);
-  return new LazySequence<>(() -> {
-List> sequencesByInterval = new 
ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size());
-addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
-addSequencesFromServer(sequencesByInterval, segmentsByServer);
-return Sequences
-.simple(sequencesByInterval)
-.flatMerge(seq -> seq, query.getResultOrdering());
-  });
+  // This pipeline follows a few general steps:
+  // 1. Fetch cache results - Unfortunately this is an eager operation so 
that the non cached items can
+  // be batched per server. Cached results are assigned to a mock server 
ALREADY_CACHED_SERVER
+  // 2. Group the segment information by server
+  // 3. Per server (including the ALREADY_CACHED_SERVER) create the 
appropriate Sequence results - cached results
+  // are handled in their own merge
+  final Stream>> cacheResolvedResults = 
deserializeFromCache(
+  maybeFetchCacheResults(queryCacheKey, segments)
+  );
+  return groupCachedResultsByServer(cacheResolvedResults)
+  .map(this::runOnServer)
+  // We do a hard materialization here so that the resulting 
spliterators have properties that we want
+  // Otherwise the stream's spliterator is of a hash map entry 
spliterator from the group-by-server operation
+  // This also causes eager initialization of the **sequences**, aka 
forking off the direct druid client requests
+  // Sequence result accumulation should still be lazy
+  .collect(Collectors.toList())
+  .stream();
 }
 
-private Set computeSegmentsToQuery(TimelineLookup timeline)
+/**
+ * Create a stream of the partition chunks which are useful in this query
+ *
+ * @param holder The holder of the shard to server component of the 
timeline
 
 Review comment:
   I would say that these param and return tags don't bear any extra useful 
sense


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213870124
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
+ * @param segments  The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an 
optional that is present
+ * if a cached value was found
+ */
+private Stream>> 
maybeFetchCacheResults(
 final byte[] queryCacheKey,
 
 Review comment:
   This param should be `@Nullable`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213864457
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -242,74 +298,90 @@ public CachingClusteredClient(
 contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
 contextBuilder.put("bySegment", true);
   }
-  return contextBuilder.build();
+  return Collections.unmodifiableMap(contextBuilder);
 }
 
-Sequence run(final UnaryOperator> timelineConverter)
+Stream> run(final UnaryOperator> timelineConverter)
 {
   @Nullable
   TimelineLookup timeline = 
serverView.getTimeline(query.getDataSource());
   if (timeline == null) {
-return Sequences.empty();
+return Stream.empty();
   }
   timeline = timelineConverter.apply(timeline);
   if (uncoveredIntervalsLimit > 0) {
 computeUncoveredIntervals(timeline);
   }
 
-  final Set segments = computeSegmentsToQuery(timeline);
+  Stream segments = computeSegmentsToQuery(timeline);
   @Nullable
   final byte[] queryCacheKey = computeQueryCacheKey();
   if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
+// Materialize for computeCurrentEtag, then re-stream
+final List materializedSegments = 
segments.collect(Collectors.toList());
+segments = materializedSegments.stream();
+
 @Nullable
 final String prevEtag = (String) 
query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH);
 @Nullable
-final String currentEtag = computeCurrentEtag(segments, queryCacheKey);
+final String currentEtag = computeCurrentEtag(materializedSegments, 
queryCacheKey);
 if (currentEtag != null && currentEtag.equals(prevEtag)) {
-  return Sequences.empty();
+  return Stream.empty();
 }
   }
 
-  final List> alreadyCachedResults = 
pruneSegmentsWithCachedResults(queryCacheKey, segments);
-  final SortedMap> segmentsByServer = 
groupSegmentsByServer(segments);
-  return new LazySequence<>(() -> {
-List> sequencesByInterval = new 
ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size());
-addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
-addSequencesFromServer(sequencesByInterval, segmentsByServer);
-return Sequences
-.simple(sequencesByInterval)
-.flatMerge(seq -> seq, query.getResultOrdering());
-  });
+  // This pipeline follows a few general steps:
+  // 1. Fetch cache results - Unfortunately this is an eager operation so 
that the non cached items can
+  // be batched per server. Cached results are assigned to a mock server 
ALREADY_CACHED_SERVER
+  // 2. Group the segment information by server
+  // 3. Per server (including the ALREADY_CACHED_SERVER) create the 
appropriate Sequence results - cached results
+  // are handled in their own merge
+  final Stream>> cacheResolvedResults = 
deserializeFromCache(
+  maybeFetchCacheResults(queryCacheKey, segments)
+  );
+  return groupCachedResultsByServer(cacheResolvedResults)
+  .map(this::runOnServer)
+  // We do a hard materialization here so that the resulting 
spliterators have properties that we want
+  // Otherwise the stream's spliterator is of a hash map entry 
spliterator from the group-by-server operation
+  // This also causes eager initialization of the **sequences**, aka 
forking off the direct druid client requests
+  // Sequence result accumulation should still be lazy
+  .collect(Collectors.toList())
+  .stream();
 }
 
-private Set computeSegmentsToQuery(TimelineLookup timeline)
+/**
+ * Create a stream of the partition chunks which are useful in this query
+ *
+ * @param holder The holder of the shard to server component of the 
timeline
+ *
+ * @return Chunks and the segment descriptors corresponding to the chunk
+ */
+private Stream 
extractServerAndSegment(TimelineObjectHolder holder)
 {
-  final List> serversLookup = 
toolChest.filterSegments(
-  query,
-  query.getIntervals().stream().flatMap(i -> 
timeline.lookup(i).stream()).collect(Collectors.toList())
-  );
+  return DimFilterUtils
+  .filterShards(
+  query.getFilter(),
+  holder.getObject(),
+  partitionChunk -> 
partitionChunk.getObject().getSegment().getShardSpec(),
+  Maps.newHashMap()
+  )
+  .stream()
+  .map(chunk -> new ServerToSegment(
+  chunk.getObject(),
+  new SegmentDescriptor(holder.getInterval(), holder.getVersion(), 
chunk.getChunkNumber())
+  ));
+}
 
-  final Set 

[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213874142
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +461,248 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
+ * @param segments  The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an 
optional that is present
+ * if a cached value was found
+ */
+private Stream>> 
maybeFetchCacheResults(
 final byte[] queryCacheKey,
-final Set segments
+final Stream segments
 )
 {
   if (queryCacheKey == null) {
-return Collections.emptyList();
+return segments.map(s -> Pair.of(s, Optional.empty()));
   }
-  final List> alreadyCachedResults = 
Lists.newArrayList();
-  Map perSegmentCacheKeys = 
computePerSegmentCacheKeys(segments, queryCacheKey);
-  // Pull cached segments from cache and remove from set of segments to 
query
-  final Map cachedValues = 
computeCachedValues(perSegmentCacheKeys);
-
-  perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
-final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
-
-final byte[] cachedValue = cachedValues.get(segmentCacheKey);
-if (cachedValue != null) {
-  // remove cached segment from set of segments to query
-  segments.remove(segment);
-  alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
-} else if (populateCache) {
-  // otherwise, if populating cache, add segment to list of segments 
to cache
-  final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
-  addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
-}
-  });
-  return alreadyCachedResults;
+  // We materialize the stream here in order to have the bulk cache 
fetching work as expected
+  final List> materializedKeyList = 
computePerSegmentCacheKeys(
+  segments,
+  queryCacheKey
+  ).collect(Collectors.toList());
+
+  // Do bulk fetch
+  final Map> cachedValues = 
computeCachedValues(materializedKeyList.stream())
+  .collect(Pair.mapCollector());
+
+  // A limitation of the cache system is that the cached values are 
returned without passing through the original
+  // objects. This hash join is a way to get the ServerToSegment and 
Optional matched up again
+  return materializedKeyList
+  .stream()
+  .map(serializedPairSegmentAndKey -> 
lookupInCache(serializedPairSegmentAndKey, cachedValues));
 }
 
-private Map computePerSegmentCacheKeys(
-Set segments,
+private Stream> 
computePerSegmentCacheKeys(
+Stream segments,
 byte[] queryCacheKey
 )
 {
-  // cacheKeys map must preserve segment ordering, in order for shards to 
always be combined in the same order
-  Map cacheKeys = Maps.newLinkedHashMap();
-  for (ServerToSegment serverToSegment : segments) {
-final Cache.NamedKey segmentCacheKey = 
CacheUtil.computeSegmentCacheKey(
-serverToSegment.getServer().getSegment().getIdentifier(),
-serverToSegment.getSegmentDescriptor(),
-queryCacheKey
-);
-cacheKeys.put(serverToSegment, segmentCacheKey);
-  }
-  return cacheKeys;
+  return segments
+  .map(serverToSegment -> {
+// cacheKeys 

[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213867599
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -242,74 +298,90 @@ public CachingClusteredClient(
 contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
 contextBuilder.put("bySegment", true);
   }
-  return contextBuilder.build();
+  return Collections.unmodifiableMap(contextBuilder);
 }
 
-Sequence run(final UnaryOperator> timelineConverter)
+Stream> run(final UnaryOperator> timelineConverter)
 {
   @Nullable
   TimelineLookup timeline = 
serverView.getTimeline(query.getDataSource());
   if (timeline == null) {
-return Sequences.empty();
+return Stream.empty();
   }
   timeline = timelineConverter.apply(timeline);
   if (uncoveredIntervalsLimit > 0) {
 computeUncoveredIntervals(timeline);
   }
 
-  final Set segments = computeSegmentsToQuery(timeline);
+  Stream segments = computeSegmentsToQuery(timeline);
   @Nullable
   final byte[] queryCacheKey = computeQueryCacheKey();
   if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
+// Materialize for computeCurrentEtag, then re-stream
+final List materializedSegments = 
segments.collect(Collectors.toList());
+segments = materializedSegments.stream();
+
 @Nullable
 final String prevEtag = (String) 
query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH);
 @Nullable
-final String currentEtag = computeCurrentEtag(segments, queryCacheKey);
+final String currentEtag = computeCurrentEtag(materializedSegments, 
queryCacheKey);
 if (currentEtag != null && currentEtag.equals(prevEtag)) {
-  return Sequences.empty();
+  return Stream.empty();
 }
   }
 
-  final List> alreadyCachedResults = 
pruneSegmentsWithCachedResults(queryCacheKey, segments);
-  final SortedMap> segmentsByServer = 
groupSegmentsByServer(segments);
-  return new LazySequence<>(() -> {
-List> sequencesByInterval = new 
ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size());
-addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
-addSequencesFromServer(sequencesByInterval, segmentsByServer);
-return Sequences
-.simple(sequencesByInterval)
-.flatMerge(seq -> seq, query.getResultOrdering());
-  });
+  // This pipeline follows a few general steps:
+  // 1. Fetch cache results - Unfortunately this is an eager operation so 
that the non cached items can
+  // be batched per server. Cached results are assigned to a mock server 
ALREADY_CACHED_SERVER
+  // 2. Group the segment information by server
+  // 3. Per server (including the ALREADY_CACHED_SERVER) create the 
appropriate Sequence results - cached results
+  // are handled in their own merge
+  final Stream>> cacheResolvedResults = 
deserializeFromCache(
+  maybeFetchCacheResults(queryCacheKey, segments)
+  );
+  return groupCachedResultsByServer(cacheResolvedResults)
+  .map(this::runOnServer)
+  // We do a hard materialization here so that the resulting 
spliterators have properties that we want
+  // Otherwise the stream's spliterator is of a hash map entry 
spliterator from the group-by-server operation
+  // This also causes eager initialization of the **sequences**, aka 
forking off the direct druid client requests
+  // Sequence result accumulation should still be lazy
+  .collect(Collectors.toList())
+  .stream();
 }
 
-private Set computeSegmentsToQuery(TimelineLookup timeline)
+/**
+ * Create a stream of the partition chunks which are useful in this query
 
 Review comment:
   What do you mean here by "useful"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] b-slim commented on issue #6266: Rename io.druid to org.apache.druid.

2018-08-29 Thread GitBox
b-slim commented on issue #6266: Rename io.druid to org.apache.druid.
URL: https://github.com/apache/incubator-druid/pull/6266#issuecomment-417145271
 
 
    after test, my browser crashes when loading pages...will have to trust 
@gianm on this one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on issue #6266: Rename io.druid to org.apache.druid.

2018-08-29 Thread GitBox
drcrallen commented on issue #6266: Rename io.druid to org.apache.druid.
URL: https://github.com/apache/incubator-druid/pull/6266#issuecomment-417142632
 
 
   @gianm any chance there's a way to get `git` to recognize these as renames 
instead of drops and adds?
   
   As it stands, I don't think a merge from master would be tolerable for any 
outstanding PR as they would all just be klobbered


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] fjy commented on issue #6112: Prohibit LinkedList

2018-08-29 Thread GitBox
fjy commented on issue #6112: Prohibit LinkedList
URL: https://github.com/apache/incubator-druid/pull/6112#issuecomment-417137794
 
 
   I'll change my vote to 0-. I'm not going veto the PR but I also see almost 
no benefit in it either.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
leventov commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213852342
 
 

 ##
 File path: server/src/main/java/io/druid/client/DirectDruidClient.java
 ##
 @@ -590,6 +591,27 @@ private void init()
 {
   if (jp == null) {
 try {
+  // Safety for if we are in a FJP
+  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker()
 
 Review comment:
   @jihoonson performance penalty if running outside of a ForkJoinPool should 
be minimal.
   
   @drcrallen please extract an utility method


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] shaharck commented on issue #5150: Druid-parquet-extensions fails on timestamps (stored as INT96) in parquet files

2018-08-29 Thread GitBox
shaharck commented on issue #5150: Druid-parquet-extensions fails on timestamps 
(stored as INT96) in parquet files
URL: 
https://github.com/apache/incubator-druid/issues/5150#issuecomment-417085925
 
 
   thanks @amalakar. actually if i excluded the field it does seem to work, 
just had to find a different field for timestamp


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] clintropolis commented on issue #6129: Add support for 'maxTotalRows' to incremental publishing kafka indexing task and appenderator based realtime task

2018-08-29 Thread GitBox
clintropolis commented on issue #6129: Add support for 'maxTotalRows' to 
incremental publishing kafka indexing task and appenderator based realtime task
URL: https://github.com/apache/incubator-druid/pull/6129#issuecomment-417085164
 
 
   @jihoonson do you have any additional comments? I believe I've addressed all 
existing review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213811743
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -169,34 +186,64 @@ public CachingClusteredClient(
 return new SpecificQueryRunnable<>(queryPlus, 
responseContext).run(timelineConverter);
   }
 
+  private  Sequence runAndMergeWithTimelineChange(
+  final Query query,
+  final QueryPlus queryPlus,
+  final Map responseContext,
+  final UnaryOperator> 
timelineConverter
+  )
+  {
+final Stream> sequences = run(
+queryPlus,
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213811376
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -471,94 +552,162 @@ private CachePopulator getCachePopulator(String 
segmentId, Interval segmentInter
   return cachePopulatorMap.get(StringUtils.format("%s_%s", segmentId, 
segmentInterval));
 }
 
-private SortedMap> 
groupSegmentsByServer(Set segments)
+/**
+ * Check the input stream to see what was cached and what was not. For the 
ones that were cached, merge the results
+ * and return the merged sequence. For the ones that were NOT cached, get 
the server result sequence queued up into
+ * the stream response
+ *
+ * @param segmentOrResult A list that is traversed in order to determine 
what should be sent back. All segments
+ *should be on the same server.
+ *
+ * @return A sequence of either the merged cached results, or the server 
results from any particular server
+ */
+private Sequence runOnServer(List> 
segmentOrResult)
 {
-  final SortedMap> serverSegments = 
Maps.newTreeMap();
-  for (ServerToSegment serverToSegment : segments) {
-final QueryableDruidServer queryableDruidServer = 
serverToSegment.getServer().pick();
-
-if (queryableDruidServer == null) {
-  log.makeAlert(
-  "No servers found for SegmentDescriptor[%s] for DataSource[%s]?! 
How can this be?!",
-  serverToSegment.getSegmentDescriptor(),
-  query.getDataSource()
-  ).emit();
-} else {
-  final DruidServer server = queryableDruidServer.getServer();
-  serverSegments.computeIfAbsent(server, s -> new 
ArrayList<>()).add(serverToSegment.getSegmentDescriptor());
-}
+  final List segmentsOfServer = segmentOrResult.stream(
+  ).map(
+  ServerMaybeSegmentMaybeCache::getSegmentDescriptor
+  ).filter(
+  Optional::isPresent
+  ).map(
+  Optional::get
+  ).collect(
+  Collectors.toList()
+  );
+
+  // We should only ever have cache or queries to run, not both. So if we 
have no segments, try caches
+  if (segmentsOfServer.isEmpty()) {
+// Have a special sequence for the cache results so the merge doesn't 
go all crazy.
+// See 
io.druid.java.util.common.guava.MergeSequenceTest.testScrewsUpOnOutOfOrder for 
an example
+// With zero results actually being found (no segments no caches) this 
should essentially return a no-op
+// merge sequence
+return new MergeSequence<>(query.getResultOrdering(), Sequences.simple(
+segmentOrResult.stream(
+).map(
+ServerMaybeSegmentMaybeCache::getCachedValue
+).filter(
+Optional::isPresent
+).map(
+Optional::get
+).map(
+Collections::singletonList
+).map(
+Sequences::simple
+)
+));
   }
-  return serverSegments;
-}
 
-private void addSequencesFromCache(
-final List> listOfSequences,
-final List> cachedResults
-)
-{
-  if (strategy == null) {
-return;
+  final DruidServer server = segmentOrResult.get(0).getServer();
+  final QueryRunner serverRunner = serverView.getQueryRunner(server);
+
+  if (serverRunner == null) {
+log.error("Server[%s] doesn't have a query runner", server);
+return Sequences.empty();
   }
 
-  final Function pullFromCacheFunction = 
strategy.pullFromSegmentLevelCache();
-  final TypeReference cacheObjectClazz = 
strategy.getCacheObjectClazz();
-  for (Pair cachedResultPair : cachedResults) {
-final byte[] cachedResult = cachedResultPair.rhs;
-Sequence cachedSequence = new BaseSequence<>(
-new BaseSequence.IteratorMaker>()
-{
-  @Override
-  public Iterator make()
-  {
-try {
-  if (cachedResult.length == 0) {
-return Collections.emptyIterator();
-  }
+  final MultipleSpecificSegmentSpec segmentsOfServerSpec = new 
MultipleSpecificSegmentSpec(segmentsOfServer);
 
-  return objectMapper.readValues(
-  objectMapper.getFactory().createParser(cachedResult),
-  cacheObjectClazz
-  );
-}
-catch (IOException e) {
-  throw new RuntimeException(e);
-}
-  }
+  final Sequence serverResults;
+  if (isBySegment) {
+serverResults = getBySegmentServerResults(serverRunner, 
segmentsOfServerSpec);
+  } else if 

[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213811062
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -623,7 +772,41 @@ private void addSequencesFromServer(
 }
 return res;
   })
-  .flatMerge(seq -> seq, query.getResultOrdering());
+  .flatMerge(Function.identity(), query.getResultOrdering());
+}
+  }
+
+  // POJO
+  private static class ServerMaybeSegmentMaybeCache
+  {
+private final DruidServer server;
+private final Optional segmentDescriptor;
 
 Review comment:
   I think I fixed this, the grouping thing must have been an older 
implementation


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213810080
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -169,34 +186,64 @@ public CachingClusteredClient(
 return new SpecificQueryRunnable<>(queryPlus, 
responseContext).run(timelineConverter);
   }
 
+  private  Sequence runAndMergeWithTimelineChange(
+  final Query query,
+  final QueryPlus queryPlus,
+  final Map responseContext,
+  final UnaryOperator> 
timelineConverter
+  )
+  {
+final Stream> sequences = run(
+queryPlus,
+responseContext,
+timelineConverter
+);
+final OptionalLong mergeBatch = 
QueryContexts.getIntermediateMergeBatchThreshold(query);
+if (mergeBatch.isPresent()) {
+  return MergeWorkTask.parallelMerge(
+  query.getResultOrdering(),
+  sequences.parallel(),
+  mergeBatch.getAsLong(),
+  mergeFjp
+  );
+} else {
+  return new MergeSequence<>(
+  query.getResultOrdering(),
 
 Review comment:
   fixed, and immediately above fits on one line as well so went ahead and 
moved that one


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213809885
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -169,34 +186,64 @@ public CachingClusteredClient(
 return new SpecificQueryRunnable<>(queryPlus, 
responseContext).run(timelineConverter);
   }
 
+  private  Sequence runAndMergeWithTimelineChange(
+  final Query query,
+  final QueryPlus queryPlus,
+  final Map responseContext,
+  final UnaryOperator> 
timelineConverter
+  )
+  {
+final Stream> sequences = run(
+queryPlus,
+responseContext,
+timelineConverter
+);
+final OptionalLong mergeBatch = 
QueryContexts.getIntermediateMergeBatchThreshold(query);
+if (mergeBatch.isPresent()) {
+  return MergeWorkTask.parallelMerge(
+  query.getResultOrdering(),
+  sequences.parallel(),
+  mergeBatch.getAsLong(),
+  mergeFjp
+  );
+} else {
+  return new MergeSequence<>(
+  query.getResultOrdering(),
+  Sequences.simple(sequences)
+  );
+}
+  }
+
   @Override
   public  QueryRunner getQueryRunnerForSegments(final Query query, 
final Iterable specs)
   {
-return new QueryRunner()
-{
-  @Override
-  public Sequence run(final QueryPlus queryPlus, final Map responseContext)
-  {
-return CachingClusteredClient.this.run(
-queryPlus,
-responseContext,
-timeline -> {
-  final VersionedIntervalTimeline 
timeline2 =
-  new VersionedIntervalTimeline<>(Ordering.natural());
-  for (SegmentDescriptor spec : specs) {
-final PartitionHolder entry = 
timeline.findEntry(spec.getInterval(), spec.getVersion());
-if (entry != null) {
-  final PartitionChunk chunk = 
entry.getChunk(spec.getPartitionNumber());
-  if (chunk != null) {
-timeline2.add(spec.getInterval(), spec.getVersion(), 
chunk);
-  }
-}
+return (queryPlus, responseContext) -> runAndMergeWithTimelineChange(
+query,
+queryPlus,
+responseContext,
+timeline -> {
+  final VersionedIntervalTimeline timeline2 =
+  new VersionedIntervalTimeline<>(Ordering.natural());
+  for (SegmentDescriptor spec : specs) {
+final PartitionHolder entry = timeline.findEntry(
+spec.getInterval(),
+spec.getVersion()
+);
+if (entry != null) {
+  final PartitionChunk chunk = entry.getChunk(
+  spec.getPartitionNumber());
+  if (chunk != null) {
+timeline2.add(
+spec.getInterval(),
 
 Review comment:
   this has been fixed. Personally, I find a bunch of long lines harder to 
read. I've attempted to strike a reasonable balance in this class


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213808016
 
 

 ##
 File path: 
java-util/src/main/java/io/druid/java/util/common/concurrent/Execs.java
 ##
 @@ -147,4 +151,32 @@ public void rejectedExecution(Runnable r, 
ThreadPoolExecutor executor)
 }
 );
   }
+
+  private static final AtomicLong fjpWorkerThreadCount = new AtomicLong(0L);
+
+  public static ForkJoinWorkerThread makeWorkerThread(String name, 
ForkJoinPool pool)
+  {
+final ForkJoinWorkerThread t = new ForkJoinWorkerThread(pool)
+{
 
 Review comment:
   yep, that works great!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213807872
 
 

 ##
 File path: 
java-util/src/main/java/io/druid/java/util/common/guava/MergeWorkTask.java
 ##
 @@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.java.util.common.guava;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.druid.java.util.common.Pair;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public class MergeWorkTask extends ForkJoinTask>
+{
+
+  /**
+   * Take a stream of sequences, split them as possible, and do intermediate 
merges. If the input stream is not
+   * a parallel stream, do a traditional merge. The stream attempts to use 
groups of {@code batchSize} to do its work, but this
+   * goal is on a best effort basis. Input streams that cannot be split or are 
not sized or not subsized might not be
+   * elligable for this parallelization. The intermediate merges are done in 
the passed in ForkJoinPool, but the final
+   * merge is still done when the returned sequence accumulated. The 
intermediate merges are yielded in the order
+   * in which they are ready.
+   *
+   * Exceptions that happen during execution of the merge are passed through 
and bubbled up during the resulting sequence
+   * iteration
+   *
+   * @param mergerFn  The function that will merge a stream of sequences 
into a single sequence. If the baseSequences stream is parallel, this work will 
be done in the FJP, otherwise it will be called directly.
+   * @param baseSequences The sequences that need merged
+   * @param batchSize The input stream should be split down to this number 
if possible. This sets the target number of segments per merge thread work
+   * @param fjp   The ForkJoinPool to do the intermediate merges in.
+   * @paramThe result type
+   *
+   * @return A Sequence that will be the merged results of the sub-sequences
+   *
+   * @throws RuntimeException Will throw a RuntimeException in during 
iterating through the returned Sequence if a Throwable
+   *  was encountered in an intermediate merge
+   */
+  public static  Sequence parallelMerge(
 
 Review comment:
   Added a big comment section explaining  what it is doing and how this 
workflow deviates a bit from a "classic" recursive task.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on issue #6135: Add support 'keepSegmentGranularity' for automatic compaction

2018-08-29 Thread GitBox
jihoonson commented on issue #6135: Add support 'keepSegmentGranularity' for 
automatic compaction
URL: 
https://github.com/apache/incubator-druid/issues/6135#issuecomment-417067358
 
 
   https://github.com/apache/incubator-druid/pull/6203 should be done first 
before this issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jihoonson commented on issue #6203: Add support targetCompactionSizeBytes for compactionTask

2018-08-29 Thread GitBox
jihoonson commented on issue #6203: Add support targetCompactionSizeBytes for 
compactionTask
URL: https://github.com/apache/incubator-druid/pull/6203#issuecomment-417065806
 
 
   Let me do more tests before merging this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] a2l007 commented on issue #6265: Securing truststore passwords used for SSL connections to Kafka

2018-08-29 Thread GitBox
a2l007 commented on issue #6265: Securing truststore passwords used for SSL 
connections to Kafka
URL: 
https://github.com/apache/incubator-druid/issues/6265#issuecomment-417061461
 
 
   Makes sense. I'll work on getting a PR out for this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] QiuMM commented on a change in pull request #6263: Add ability to specify list of task ports

2018-08-29 Thread GitBox
QiuMM commented on a change in pull request #6263: Add ability to specify list 
of task ports
URL: https://github.com/apache/incubator-druid/pull/6263#discussion_r213768163
 
 

 ##
 File path: docs/content/configuration/index.md
 ##
 @@ -1040,6 +1040,7 @@ Middle managers pass their configurations down to their 
child peons. The middle
 |`druid.indexer.runner.javaOptsArray`|A json array of strings to be passed in 
as options to the peon's jvm. This is additive to javaOpts and is recommended 
for properly handling arguments which contain quotes or spaces like 
`["-XX:OnOutOfMemoryError=kill -9 %p"]`|`[]`|
 |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can 
be created in Zookeeper.|524288|
 |`druid.indexer.runner.startPort`|Starting port used for peon processes, 
should be greater than 1023.|8100|
+|`druid.indexer.runner.ports`|A json array of integers to specify ports that 
used for peon processes. This property is an alternative to 
`druid.indexer.runner.startPort`. If specified and non-empty, ports for one 
peon process will be chosed from these ports rather than using 
`druid.indexer.runner.startPort` to allocate ports.|`[]`|
 
 Review comment:
   Got it. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] QiuMM commented on a change in pull request #6263: Add ability to specify list of task ports

2018-08-29 Thread GitBox
QiuMM commented on a change in pull request #6263: Add ability to specify list 
of task ports
URL: https://github.com/apache/incubator-druid/pull/6263#discussion_r213758356
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
 ##
 @@ -55,30 +58,54 @@ private static boolean canBind(int portNum)
 
   public synchronized int findUnusedPort()
   {
-int port = chooseNext(startPort);
-while (!canBind(port)) {
-  port = chooseNext(port + 1);
+if (candidatePorts != null && !candidatePorts.isEmpty()) {
+  int port = chooseFromCandidates();
+  usedPorts.add(port);
+  return port;
+} else {
+  int port = chooseNext(startPort);
+  while (!canBind(port)) {
+port = chooseNext(port + 1);
+  }
+  usedPorts.add(port);
+  return port;
 }
-usedPorts.add(port);
-return port;
   }
 
-  public synchronized Pair findTwoConsecutiveUnusedPorts()
+  public synchronized Pair findTwoUnusedPorts()
   {
-int firstPort = chooseNext(startPort);
-while (!canBind(firstPort) || !canBind(firstPort + 1)) {
-  firstPort = chooseNext(firstPort + 1);
+if (candidatePorts != null && !candidatePorts.isEmpty()) {
+  int firstPort = chooseFromCandidates();
+  int secondPort = chooseFromCandidates();
+  usedPorts.add(firstPort);
+  usedPorts.add(secondPort);
+  return new Pair<>(firstPort, secondPort);
+} else {
+  int firstPort = chooseNext(startPort);
+  while (!canBind(firstPort) || !canBind(firstPort + 1)) {
+firstPort = chooseNext(firstPort + 1);
+  }
+  usedPorts.add(firstPort);
+  usedPorts.add(firstPort + 1);
+  return new Pair<>(firstPort, firstPort + 1);
 }
-usedPorts.add(firstPort);
-usedPorts.add(firstPort + 1);
-return new Pair<>(firstPort, firstPort + 1);
   }
 
   public synchronized void markPortUnused(int port)
   {
 usedPorts.remove(port);
   }
 
+  private int chooseFromCandidates()
+  {
+for (int port : candidatePorts) {
+  if (!usedPorts.contains(port) && canBind(port)) {
+return port;
+  }
+}
+throw new ISE("All ports are Used..");
 
 Review comment:
   I do not think so. The reason why adding ability to specify list of task 
ports is that we need to decide ahead of time exactly what ports peon processes 
are going to use in some containerized environments. So if user specify 
`druid.indexer.runner.ports`, they shoulde ensure these ports are usable. And 
it may be illegal to use ports which are not contained in user specified ports. 
In my use case, it does a illegal thing and may cause potential port conflicts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] jon-wei commented on issue #6265: Securing truststore passwords used for SSL connections to Kafka

2018-08-29 Thread GitBox
jon-wei commented on issue #6265: Securing truststore passwords used for SSL 
connections to Kafka
URL: 
https://github.com/apache/incubator-druid/issues/6265#issuecomment-417059459
 
 
   > This could probably be fixed by plugging in a PasswordProvider 
implementation
   
   I think that would work, you could adjust the consumerProperties and resolve 
the PasswordProviders in `getKafkaConsumer()`.
   
   > or maybe reuse the keystore info from the 
druid.server.https.keyStorePassword property
   
   I think the PasswordProvider or another mechanism would be better, there's 
no guarantee that the truststore needed for connecting to kafka would have the 
same password as the truststore/keystore used by Druid.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] a2l007 opened a new issue #6265: Securing truststore passwords used for SSL connections to Kafka

2018-08-29 Thread GitBox
a2l007 opened a new issue #6265: Securing truststore passwords used for SSL 
connections to Kafka
URL: https://github.com/apache/incubator-druid/issues/6265
 
 
   Presently for Kafka indexing service, connecting to Kafka via SSL would 
require the user to add the keystore and truststore passwords as plaintext data 
in the `consumerProperties` section. This information is visible from the 
supervisor spec in the overlord console as well as the task logs. This could 
probably be fixed by plugging in a PasswordProvider implementation for these 
properties or maybe reuse the keystore info from the 
`druid.server.https.keyStorePassword` property. Any thoughts?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213772755
 
 

 ##
 File path: processing/src/main/java/io/druid/query/QueryContexts.java
 ##
 @@ -218,6 +239,12 @@
 return val == null ? defaultValue : Numbers.parseLong(val);
   }
 
+  static  OptionalLong parseLong(Query query, String key)
 
 Review comment:
   @jihoonson let me know if you still think passing in a default is cleaner in 
such a case


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213772578
 
 

 ##
 File path: processing/src/main/java/io/druid/query/QueryContexts.java
 ##
 @@ -218,6 +239,12 @@
 return val == null ? defaultValue : Numbers.parseLong(val);
   }
 
+  static  OptionalLong parseLong(Query query, String key)
 
 Review comment:
   ah, I remember now. If this setting is not present in the context, then the 
parallel merging is not used and the legacy sequence merge is used instead. As 
such, I can either use a "default" value and check for that, or just return the 
optional, and choose the code branch based on if it is present or not.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213771619
 
 

 ##
 File path: processing/src/main/java/io/druid/query/QueryContexts.java
 ##
 @@ -218,6 +239,12 @@
 return val == null ? defaultValue : Numbers.parseLong(val);
   }
 
+  static  OptionalLong parseLong(Query query, String key)
 
 Review comment:
   for now I'll change it to match


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213771252
 
 

 ##
 File path: processing/src/main/java/io/druid/query/QueryContexts.java
 ##
 @@ -218,6 +239,12 @@
 return val == null ? defaultValue : Numbers.parseLong(val);
   }
 
+  static  OptionalLong parseLong(Query query, String key)
 
 Review comment:
   They were also written before `Optional` was in the java spec. It feels odd 
to me to have the defaults passed around from the caller to this class just to 
reflect it back. IMHO a long term solution should just give the `Optional` back 
in all cases, and let the caller figure out what to do if there is nothing 
present. The default value would then be handled in the caller completely, or 
an exception or other case can be executed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213770519
 
 

 ##
 File path: processing/src/main/java/io/druid/guice/ForkJoinPoolProvider.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.guice;
+
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.logger.Logger;
+
+import javax.inject.Provider;
+import java.util.concurrent.ForkJoinPool;
+
+public class ForkJoinPoolProvider implements Provider
+{
+  private static final Logger LOG = new Logger(ForkJoinPoolProvider.class);
+
+  private final String nameFormat;
+
+  public ForkJoinPoolProvider(String nameFormat)
+  {
+// Fail fast on bad name format
+Execs.checkThreadNameFormat(nameFormat);
+this.nameFormat = nameFormat;
+  }
+
+  @Override
+  public LifecycleForkJoinPool get()
+  {
+return new LifecycleForkJoinPool(
+// This should probably be configurable. Until then, just piggyback 
off the common pool's parallelism
+ForkJoinPool.commonPool().getParallelism(),
+pool -> Execs.makeWorkerThread(nameFormat, pool),
+(t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", t),
 
 Review comment:
   Note that the calls that return `Future` still behave as expected.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213770123
 
 

 ##
 File path: processing/src/main/java/io/druid/guice/ForkJoinPoolProvider.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.guice;
+
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.logger.Logger;
+
+import javax.inject.Provider;
+import java.util.concurrent.ForkJoinPool;
+
+public class ForkJoinPoolProvider implements Provider
+{
+  private static final Logger LOG = new Logger(ForkJoinPoolProvider.class);
+
+  private final String nameFormat;
+
+  public ForkJoinPoolProvider(String nameFormat)
+  {
+// Fail fast on bad name format
+Execs.checkThreadNameFormat(nameFormat);
+this.nameFormat = nameFormat;
+  }
+
+  @Override
+  public LifecycleForkJoinPool get()
+  {
+return new LifecycleForkJoinPool(
+// This should probably be configurable. Until then, just piggyback 
off the common pool's parallelism
+ForkJoinPool.commonPool().getParallelism(),
+pool -> Execs.makeWorkerThread(nameFormat, pool),
+(t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", t),
 
 Review comment:
   Added an example. It cannot due to how the executor service works. Here's an 
example of what's in the log for the exception swallowing test:
   
   ```
   2018-08-29T17:35:32,341 ERROR [test-fjp-1] 
io.druid.guice.ForkJoinPoolProvider - Unhandled exception in thread 
[Thread[test-fjp-1,5,main]]
   
   java.lang.RuntimeException: test exception
at 
io.druid.guice.ForkJoinPoolProviderTest.testThreadSwallowsException(ForkJoinPoolProviderTest.java:55)
 ~[test-classes/:?]
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213770123
 
 

 ##
 File path: processing/src/main/java/io/druid/guice/ForkJoinPoolProvider.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.guice;
+
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.logger.Logger;
+
+import javax.inject.Provider;
+import java.util.concurrent.ForkJoinPool;
+
+public class ForkJoinPoolProvider implements Provider
+{
+  private static final Logger LOG = new Logger(ForkJoinPoolProvider.class);
+
+  private final String nameFormat;
+
+  public ForkJoinPoolProvider(String nameFormat)
+  {
+// Fail fast on bad name format
+Execs.checkThreadNameFormat(nameFormat);
+this.nameFormat = nameFormat;
+  }
+
+  @Override
+  public LifecycleForkJoinPool get()
+  {
+return new LifecycleForkJoinPool(
+// This should probably be configurable. Until then, just piggyback 
off the common pool's parallelism
+ForkJoinPool.commonPool().getParallelism(),
+pool -> Execs.makeWorkerThread(nameFormat, pool),
+(t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", t),
 
 Review comment:
   Added an example. It cannot due to how the executor service works. Here's an 
example of what's in the log for the exception swallowing test:
   
   ```
   2018-08-29T17:35:32,341 ERROR [test-fjp-1] 
io.druid.guice.ForkJoinPoolProvider - Unhandled exception in thread 
[Thread[test-fjp-1,5,main]]
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] QiuMM commented on a change in pull request #6263: Add ability to specify list of task ports

2018-08-29 Thread GitBox
QiuMM commented on a change in pull request #6263: Add ability to specify list 
of task ports
URL: https://github.com/apache/incubator-druid/pull/6263#discussion_r213762509
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
 ##
 @@ -55,30 +58,54 @@ private static boolean canBind(int portNum)
 
   public synchronized int findUnusedPort()
   {
-int port = chooseNext(startPort);
-while (!canBind(port)) {
-  port = chooseNext(port + 1);
+if (candidatePorts != null && !candidatePorts.isEmpty()) {
+  int port = chooseFromCandidates();
+  usedPorts.add(port);
+  return port;
+} else {
+  int port = chooseNext(startPort);
+  while (!canBind(port)) {
+port = chooseNext(port + 1);
+  }
+  usedPorts.add(port);
+  return port;
 }
-usedPorts.add(port);
-return port;
   }
 
-  public synchronized Pair findTwoConsecutiveUnusedPorts()
+  public synchronized Pair findTwoUnusedPorts()
   {
-int firstPort = chooseNext(startPort);
-while (!canBind(firstPort) || !canBind(firstPort + 1)) {
-  firstPort = chooseNext(firstPort + 1);
+if (candidatePorts != null && !candidatePorts.isEmpty()) {
+  int firstPort = chooseFromCandidates();
+  int secondPort = chooseFromCandidates();
+  usedPorts.add(firstPort);
+  usedPorts.add(secondPort);
+  return new Pair<>(firstPort, secondPort);
+} else {
+  int firstPort = chooseNext(startPort);
+  while (!canBind(firstPort) || !canBind(firstPort + 1)) {
+firstPort = chooseNext(firstPort + 1);
+  }
+  usedPorts.add(firstPort);
+  usedPorts.add(firstPort + 1);
+  return new Pair<>(firstPort, firstPort + 1);
 }
-usedPorts.add(firstPort);
-usedPorts.add(firstPort + 1);
-return new Pair<>(firstPort, firstPort + 1);
   }
 
   public synchronized void markPortUnused(int port)
   {
 usedPorts.remove(port);
   }
 
+  private int chooseFromCandidates()
+  {
+for (int port : candidatePorts) {
+  if (!usedPorts.contains(port) && canBind(port)) {
+return port;
+  }
+}
+throw new ISE("All ports are Used..");
 
 Review comment:
   IMO, druid nodes should use `druid.indexer.runner.startPort` or 
`druid.indexer.runner.ports` to allocate task ports rather than using both of 
them at the same time. What's more, it also will make user confused if him has 
specified `druid.indexer.runner.ports` but peon processes using ports that are 
not contained in these ports. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] QiuMM commented on a change in pull request #6263: Add ability to specify list of task ports

2018-08-29 Thread GitBox
QiuMM commented on a change in pull request #6263: Add ability to specify list 
of task ports
URL: https://github.com/apache/incubator-druid/pull/6263#discussion_r213768163
 
 

 ##
 File path: docs/content/configuration/index.md
 ##
 @@ -1040,6 +1040,7 @@ Middle managers pass their configurations down to their 
child peons. The middle
 |`druid.indexer.runner.javaOptsArray`|A json array of strings to be passed in 
as options to the peon's jvm. This is additive to javaOpts and is recommended 
for properly handling arguments which contain quotes or spaces like 
`["-XX:OnOutOfMemoryError=kill -9 %p"]`|`[]`|
 |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can 
be created in Zookeeper.|524288|
 |`druid.indexer.runner.startPort`|Starting port used for peon processes, 
should be greater than 1023.|8100|
+|`druid.indexer.runner.ports`|A json array of integers to specify ports that 
used for peon processes. This property is an alternative to 
`druid.indexer.runner.startPort`. If specified and non-empty, ports for one 
peon process will be chosed from these ports rather than using 
`druid.indexer.runner.startPort` to allocate ports.|`[]`|
 
 Review comment:
   Got it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] QiuMM commented on a change in pull request #6263: Add ability to specify list of task ports

2018-08-29 Thread GitBox
QiuMM commented on a change in pull request #6263: Add ability to specify list 
of task ports
URL: https://github.com/apache/incubator-druid/pull/6263#discussion_r213767941
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
 ##
 @@ -27,16 +27,19 @@
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.net.SocketException;
+import java.util.List;
 import java.util.Set;
 
 public class PortFinder
 {
   private final Set usedPorts = Sets.newHashSet();
   private final int startPort;
+  private final List candidatePorts;
 
-  public PortFinder(int startPort)
+  public PortFinder(int startPort, List candidatePorts)
 
 Review comment:
   I am not very willing to do so for I have to add an extra `if statement` 
which will make code a little ugly than before  If more reviewers agree 
with you, then it will be fine and I'll modify these code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] QiuMM commented on a change in pull request #6263: Add ability to specify list of task ports

2018-08-29 Thread GitBox
QiuMM commented on a change in pull request #6263: Add ability to specify list 
of task ports
URL: https://github.com/apache/incubator-druid/pull/6263#discussion_r213767941
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
 ##
 @@ -27,16 +27,19 @@
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.net.SocketException;
+import java.util.List;
 import java.util.Set;
 
 public class PortFinder
 {
   private final Set usedPorts = Sets.newHashSet();
   private final int startPort;
+  private final List candidatePorts;
 
-  public PortFinder(int startPort)
+  public PortFinder(int startPort, List candidatePorts)
 
 Review comment:
   I am not very willing to do so for I have to add an extra `if statement` 
which will make code a little ugly than before  If more reviewers agree 
with you, then it will be fine and I'll modify this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213767209
 
 

 ##
 File path: processing/src/main/java/io/druid/guice/LifecycleForkJoinPool.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.guice;
+
+import io.druid.java.util.common.lifecycle.LifecycleStop;
+import io.druid.java.util.common.logger.Logger;
+
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+public class LifecycleForkJoinPool extends ForkJoinPool
+{
+  private static final Logger LOG = new Logger(LifecycleForkJoinPool.class);
+
+  public LifecycleForkJoinPool(
+  int parallelism,
+  ForkJoinWorkerThreadFactory factory,
+  Thread.UncaughtExceptionHandler handler,
+  boolean asyncMode
+  )
+  {
+super(parallelism, factory, handler, asyncMode);
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+LOG.info("Shutting down ForkJoinPool [%s]", this);
+shutdown();
+try {
+  // Should this be configurable?
 
 Review comment:
   https://github.com/apache/incubator-druid/issues/6264 raised


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213765160
 
 

 ##
 File path: 
java-util/src/main/java/io/druid/java/util/common/guava/MergeWorkTask.java
 ##
 @@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.java.util.common.guava;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.druid.java.util.common.Pair;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public class MergeWorkTask extends ForkJoinTask>
+{
+
+  /**
+   * Take a stream of sequences, split them as possible, and do intermediate 
merges. If the input stream is not
+   * a parallel stream, do a traditional merge. The stream attempts to use 
groups of {@code batchSize} to do its work, but this
+   * goal is on a best effort basis. Input streams that cannot be split or are 
not sized or not subsized might not be
+   * elligable for this parallelization. The intermediate merges are done in 
the passed in ForkJoinPool, but the final
+   * merge is still done when the returned sequence accumulated. The 
intermediate merges are yielded in the order
+   * in which they are ready.
+   *
+   * Exceptions that happen during execution of the merge are passed through 
and bubbled up during the resulting sequence
+   * iteration
+   *
+   * @param mergerFn  The function that will merge a stream of sequences 
into a single sequence. If the baseSequences stream is parallel, this work will 
be done in the FJP, otherwise it will be called directly.
 
 Review comment:
   fixed!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213764538
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -162,34 +184,75 @@ public CachingClusteredClient(
 return new SpecificQueryRunnable<>(queryPlus, 
responseContext).run(timelineConverter);
   }
 
-  @Override
-  public  QueryRunner getQueryRunnerForSegments(final Query query, 
final Iterable specs)
+  private  QueryRunner runAndMergeWithTimelineChange(
+  final Query query,
+  final UnaryOperator> 
timelineConverter
+  )
   {
-return new QueryRunner()
-{
-  @Override
-  public Sequence run(final QueryPlus queryPlus, final Map responseContext)
-  {
-return CachingClusteredClient.this.run(
+final OptionalLong mergeBatch = 
QueryContexts.getIntermediateMergeBatchThreshold(query);
+
+if (mergeBatch.isPresent()) {
+  final QueryRunnerFactory> queryRunnerFactory = 
conglomerate.findFactory(query);
+  final QueryToolChest> toolChest = 
queryRunnerFactory.getToolchest();
+  return (queryPlus, responseContext) -> {
+final Stream> sequences = run(
 queryPlus,
 responseContext,
-timeline -> {
-  final VersionedIntervalTimeline 
timeline2 =
-  new VersionedIntervalTimeline<>(Ordering.natural());
-  for (SegmentDescriptor spec : specs) {
-final PartitionHolder entry = 
timeline.findEntry(spec.getInterval(), spec.getVersion());
-if (entry != null) {
-  final PartitionChunk chunk = 
entry.getChunk(spec.getPartitionNumber());
-  if (chunk != null) {
-timeline2.add(spec.getInterval(), spec.getVersion(), 
chunk);
-  }
-}
-  }
-  return timeline2;
-}
+timelineConverter
+);
+return MergeWorkTask.parallelMerge(
+sequences.parallel(),
+sequenceStream ->
+new FluentQueryRunnerBuilder<>(toolChest)
+.create(
+queryRunnerFactory.mergeRunners(
 
 Review comment:
   yes, it looks much better. Added some stuff in `Execs` to support this


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213764464
 
 

 ##
 File path: 
processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java
 ##
 @@ -70,87 +74,115 @@ public ChainedExecutionQueryRunner(
   QueryRunner... queryables
   )
   {
-this(exec, queryWatcher, Arrays.asList(queryables));
+this(exec, queryWatcher, Arrays.stream(queryables));
   }
 
   public ChainedExecutionQueryRunner(
   ExecutorService exec,
   QueryWatcher queryWatcher,
   Iterable> queryables
   )
+  {
+this(exec, queryWatcher, StreamSupport.stream(queryables.spliterator(), 
false));
+  }
+
+  public ChainedExecutionQueryRunner(
+  ExecutorService exec,
+  QueryWatcher queryWatcher,
+  Stream> queryables
+  )
   {
 // listeningDecorator will leave PrioritizedExecutorService unchanged,
 // since it already implements ListeningExecutorService
 this.exec = MoreExecutors.listeningDecorator(exec);
-this.queryables = Iterables.unmodifiableIterable(queryables);
 this.queryWatcher = queryWatcher;
+this.queryables = queryables;
   }
 
   @Override
   public Sequence run(final QueryPlus queryPlus, final Map responseContext)
   {
-Query query = queryPlus.getQuery();
+final Query query = queryPlus.getQuery();
 final int priority = QueryContexts.getPriority(query);
-final Ordering ordering = query.getResultOrdering();
+final Ordering ordering = query.getResultOrdering();
 final QueryPlus threadSafeQueryPlus = 
queryPlus.withoutThreadUnsafeState();
-return new BaseSequence>(
+return new BaseSequence<>(
 new BaseSequence.IteratorMaker>()
 {
   @Override
   public Iterator make()
   {
 // Make it a List<> to materialize all of the values (so that it 
will submit everything to the executor)
-ListenableFuture>> futures = Futures.allAsList(
-Lists.newArrayList(
-Iterables.transform(
-queryables,
-input -> {
-  if (input == null) {
-throw new ISE("Null queryRunner! Looks to be some 
segment unmapping action happening");
+final ListenableFuture>> futures = 
GuavaUtils.allFuturesAsList(
+queryables.peek(
+queryRunner -> {
+  if (queryRunner == null) {
+throw new ISE("Null queryRunner! Looks to be some 
segment unmapping action happening");
+  }
+}
+).map(
+queryRunner -> new 
AbstractPrioritizedCallable>(priority)
+{
+  @Override
+  public Iterable call()
+  {
+try {
+  Sequence result = 
queryRunner.run(threadSafeQueryPlus, responseContext);
+  if (result == null) {
+throw new ISE("Got a null result! Segments are 
missing!");
+  }
+
+  List retVal = result.toList();
+  if (retVal == null) {
+throw new ISE("Got a null list of results! WTF?!");
   }
 
-  return exec.submit(
-  new 
AbstractPrioritizedCallable>(priority)
-  {
-@Override
-public Iterable call()
-{
-  try {
-Sequence result = 
input.run(threadSafeQueryPlus, responseContext);
-if (result == null) {
-  throw new ISE("Got a null result! 
Segments are missing!");
-}
-
-List retVal = result.toList();
-if (retVal == null) {
-  throw new ISE("Got a null list of 
results! WTF?!");
-}
-
-return retVal;
-  }
-  catch (QueryInterruptedException e) {
-throw Throwables.propagate(e);
-  }
-  catch (Exception e) {
-log.error(e, "Exception with one of the 
sequences!");
-throw Throwables.propagate(e);
-  }
-}
-  }
-   

[GitHub] QiuMM commented on a change in pull request #6263: Add ability to specify list of task ports

2018-08-29 Thread GitBox
QiuMM commented on a change in pull request #6263: Add ability to specify list 
of task ports
URL: https://github.com/apache/incubator-druid/pull/6263#discussion_r213762509
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
 ##
 @@ -55,30 +58,54 @@ private static boolean canBind(int portNum)
 
   public synchronized int findUnusedPort()
   {
-int port = chooseNext(startPort);
-while (!canBind(port)) {
-  port = chooseNext(port + 1);
+if (candidatePorts != null && !candidatePorts.isEmpty()) {
+  int port = chooseFromCandidates();
+  usedPorts.add(port);
+  return port;
+} else {
+  int port = chooseNext(startPort);
+  while (!canBind(port)) {
+port = chooseNext(port + 1);
+  }
+  usedPorts.add(port);
+  return port;
 }
-usedPorts.add(port);
-return port;
   }
 
-  public synchronized Pair findTwoConsecutiveUnusedPorts()
+  public synchronized Pair findTwoUnusedPorts()
   {
-int firstPort = chooseNext(startPort);
-while (!canBind(firstPort) || !canBind(firstPort + 1)) {
-  firstPort = chooseNext(firstPort + 1);
+if (candidatePorts != null && !candidatePorts.isEmpty()) {
+  int firstPort = chooseFromCandidates();
+  int secondPort = chooseFromCandidates();
+  usedPorts.add(firstPort);
+  usedPorts.add(secondPort);
+  return new Pair<>(firstPort, secondPort);
+} else {
+  int firstPort = chooseNext(startPort);
+  while (!canBind(firstPort) || !canBind(firstPort + 1)) {
+firstPort = chooseNext(firstPort + 1);
+  }
+  usedPorts.add(firstPort);
+  usedPorts.add(firstPort + 1);
+  return new Pair<>(firstPort, firstPort + 1);
 }
-usedPorts.add(firstPort);
-usedPorts.add(firstPort + 1);
-return new Pair<>(firstPort, firstPort + 1);
   }
 
   public synchronized void markPortUnused(int port)
   {
 usedPorts.remove(port);
   }
 
+  private int chooseFromCandidates()
+  {
+for (int port : candidatePorts) {
+  if (!usedPorts.contains(port) && canBind(port)) {
+return port;
+  }
+}
+throw new ISE("All ports are Used..");
 
 Review comment:
   IMO, druid nodes should use `druid.indexer.runner.startPort` or 
`druid.indexer.runner.ports` to allocate task ports rather than using both of 
them at the same time. What's more, it also will make user confused if him has 
specified `druid.indexer.runner.ports` but peon processes using ports that not 
contained in these ports. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] QiuMM commented on a change in pull request #6263: Add ability to specify list of task ports

2018-08-29 Thread GitBox
QiuMM commented on a change in pull request #6263: Add ability to specify list 
of task ports
URL: https://github.com/apache/incubator-druid/pull/6263#discussion_r213758356
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
 ##
 @@ -55,30 +58,54 @@ private static boolean canBind(int portNum)
 
   public synchronized int findUnusedPort()
   {
-int port = chooseNext(startPort);
-while (!canBind(port)) {
-  port = chooseNext(port + 1);
+if (candidatePorts != null && !candidatePorts.isEmpty()) {
+  int port = chooseFromCandidates();
+  usedPorts.add(port);
+  return port;
+} else {
+  int port = chooseNext(startPort);
+  while (!canBind(port)) {
+port = chooseNext(port + 1);
+  }
+  usedPorts.add(port);
+  return port;
 }
-usedPorts.add(port);
-return port;
   }
 
-  public synchronized Pair findTwoConsecutiveUnusedPorts()
+  public synchronized Pair findTwoUnusedPorts()
   {
-int firstPort = chooseNext(startPort);
-while (!canBind(firstPort) || !canBind(firstPort + 1)) {
-  firstPort = chooseNext(firstPort + 1);
+if (candidatePorts != null && !candidatePorts.isEmpty()) {
+  int firstPort = chooseFromCandidates();
+  int secondPort = chooseFromCandidates();
+  usedPorts.add(firstPort);
+  usedPorts.add(secondPort);
+  return new Pair<>(firstPort, secondPort);
+} else {
+  int firstPort = chooseNext(startPort);
+  while (!canBind(firstPort) || !canBind(firstPort + 1)) {
+firstPort = chooseNext(firstPort + 1);
+  }
+  usedPorts.add(firstPort);
+  usedPorts.add(firstPort + 1);
+  return new Pair<>(firstPort, firstPort + 1);
 }
-usedPorts.add(firstPort);
-usedPorts.add(firstPort + 1);
-return new Pair<>(firstPort, firstPort + 1);
   }
 
   public synchronized void markPortUnused(int port)
   {
 usedPorts.remove(port);
   }
 
+  private int chooseFromCandidates()
+  {
+for (int port : candidatePorts) {
+  if (!usedPorts.contains(port) && canBind(port)) {
+return port;
+  }
+}
+throw new ISE("All ports are Used..");
 
 Review comment:
   I do not think so. The reason why adding ability to specify list of task 
ports is that we need to decide ahead of time exactly what ports peon processes 
are going to use in some containerized environments. So if user specify 
`druid.indexer.runner.ports`, they shoulde ensure ports are usable. And it may 
be a illegal thing in users' environments if we use `chooseNext()` to find a 
port while all the candidate ports are unusable. In my use case, it does a 
illegal thing and may cause potential port conflicts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213758307
 
 

 ##
 File path: server/src/main/java/io/druid/client/DirectDruidClient.java
 ##
 @@ -590,6 +591,27 @@ private void init()
 {
   if (jp == null) {
 try {
+  // Safety for if we are in a FJP
+  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker()
 
 Review comment:
   `java.util.concurrent.ForkJoinPool.ManagedBlocker` and 
`java.util.concurrent.ForkJoinPool` have information in their java docs. Can 
you explain more about what kind of information you are looking for?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213757238
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -242,74 +305,93 @@ public CachingClusteredClient(
 contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
 contextBuilder.put("bySegment", true);
   }
-  return contextBuilder.build();
+  return Collections.unmodifiableMap(contextBuilder);
 }
 
-Sequence run(final UnaryOperator> timelineConverter)
+Stream> run(final UnaryOperator> timelineConverter)
 {
   @Nullable
   TimelineLookup timeline = 
serverView.getTimeline(query.getDataSource());
   if (timeline == null) {
-return Sequences.empty();
+return Stream.empty();
   }
   timeline = timelineConverter.apply(timeline);
   if (uncoveredIntervalsLimit > 0) {
 computeUncoveredIntervals(timeline);
   }
 
-  final Set segments = computeSegmentsToQuery(timeline);
+  Stream segments = computeSegmentsToQuery(timeline);
   @Nullable
   final byte[] queryCacheKey = computeQueryCacheKey();
   if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
+// Materialize then re-stream
+final List materializedSegments = 
segments.collect(Collectors.toList());
+segments = materializedSegments.stream();
+
 @Nullable
 final String prevEtag = (String) 
query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH);
 @Nullable
-final String currentEtag = computeCurrentEtag(segments, queryCacheKey);
+final String currentEtag = computeCurrentEtag(materializedSegments, 
queryCacheKey);
 if (currentEtag != null && currentEtag.equals(prevEtag)) {
-  return Sequences.empty();
+  return Stream.empty();
 }
   }
 
-  final List> alreadyCachedResults = 
pruneSegmentsWithCachedResults(queryCacheKey, segments);
-  final SortedMap> segmentsByServer = 
groupSegmentsByServer(segments);
-  return new LazySequence<>(() -> {
-List> sequencesByInterval = new 
ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size());
-addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
-addSequencesFromServer(sequencesByInterval, segmentsByServer);
-return Sequences
-.simple(sequencesByInterval)
-.flatMerge(seq -> seq, query.getResultOrdering());
-  });
+  // This pipeline follows a few general steps:
+  // 1. Fetch cache results - Unfortunately this is an eager operation so 
that the non cached items can
 
 Review comment:
   the problem here is that in our cluster we have enough nodes to where it is 
reasonable for a small datasource to have one segment or less per historical 
node. In such a scenario there will be a large quantity of cache requests (one 
per server) that would have been better to batch at the beginning.
   
   Basically I expect an increase in load on the cache system due to lack of 
ability to batch fetch cache results if such an approach were taken. That is a 
significant change in workflow compared to the implementation in `/master` 
where the cached results are fetched in bulk first, with a limit on the qty of 
results that can be cached at broker per call.
   
   As a bit of context: by allowing a limit on the qty of results per batch 
call at the broker level, it allows us to not even try to fetch, say, 1M 
results if we know our cache system can only probably return 10k results in the 
timeout limit given.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213756160
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +471,249 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
+ * @param segments  The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an 
optional that is present
+ * if a cached value was found
+ */
+private Stream>> 
maybeFetchCacheResults(
 final byte[] queryCacheKey,
-final Set segments
+final Stream segments
 )
 {
   if (queryCacheKey == null) {
-return Collections.emptyList();
+return segments.map(s -> Pair.of(s, Optional.empty()));
   }
-  final List> alreadyCachedResults = 
Lists.newArrayList();
-  Map perSegmentCacheKeys = 
computePerSegmentCacheKeys(segments, queryCacheKey);
-  // Pull cached segments from cache and remove from set of segments to 
query
-  final Map cachedValues = 
computeCachedValues(perSegmentCacheKeys);
-
-  perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
-final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
-
-final byte[] cachedValue = cachedValues.get(segmentCacheKey);
-if (cachedValue != null) {
-  // remove cached segment from set of segments to query
-  segments.remove(segment);
-  alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
-} else if (populateCache) {
-  // otherwise, if populating cache, add segment to list of segments 
to cache
-  final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
-  addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
-}
-  });
-  return alreadyCachedResults;
+  // We materialize the stream here in order to have the bulk cache 
fetching work as expected
+  final List> materializedKeyList = 
computePerSegmentCacheKeys(
+  segments,
+  queryCacheKey
+  ).collect(Collectors.toList());
+
+  // Do bulk fetch
+  final Map> cachedValues = 
computeCachedValues(materializedKeyList.stream())
+  .collect(Pair.mapCollector());
+
+  // A limitation of the cache system is that the cached values are 
returned without passing through the original
+  // objects. This hash join is a way to get the ServerToSegment and 
Optional matched up again
+  return materializedKeyList
+  .stream()
+  .map(serializedPairSegmentAndKey -> 
lookupInCache(serializedPairSegmentAndKey, cachedValues));
 }
 
-private Map computePerSegmentCacheKeys(
-Set segments,
+private Stream> 
computePerSegmentCacheKeys(
+Stream segments,
 byte[] queryCacheKey
 )
 {
-  // cacheKeys map must preserve segment ordering, in order for shards to 
always be combined in the same order
-  Map cacheKeys = Maps.newLinkedHashMap();
-  for (ServerToSegment serverToSegment : segments) {
-final Cache.NamedKey segmentCacheKey = 
CacheUtil.computeSegmentCacheKey(
-serverToSegment.getServer().getSegment().getIdentifier(),
-serverToSegment.getSegmentDescriptor(),
-queryCacheKey
-);
-cacheKeys.put(serverToSegment, segmentCacheKey);
-  }
-  return cacheKeys;
+  return segments
+  .map(serverToSegment -> {
+// cacheKeys 

[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213755919
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -389,169 +471,249 @@ private String computeCurrentEtag(final 
Set segments, @Nullable
   }
 }
 
-private List> pruneSegmentsWithCachedResults(
+private Pair> lookupInCache(
+Pair key,
+Map> cache
+)
+{
+  final ServerToSegment segment = key.getLhs();
+  final Cache.NamedKey segmentCacheKey = key.getRhs();
+  final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
+  final Optional cachedValue = Optional
+  .ofNullable(cache.get(segmentCacheKey))
+  // Shouldn't happen in practice, but can screw up unit tests where 
cache state is mutated in crazy
+  // ways when the cache returns null instead of an optional.
+  .orElse(Optional.empty());
+  if (!cachedValue.isPresent()) {
+// if populating cache, add segment to list of segments to cache if it 
is not cached
+final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
+addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+  }
+  return Pair.of(segment, cachedValue);
+}
+
+/**
+ * This materializes the input segment stream in order to let the BulkGet 
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment) 
portion
+ * @param segments  The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an 
optional that is present
+ * if a cached value was found
+ */
+private Stream>> 
maybeFetchCacheResults(
 final byte[] queryCacheKey,
-final Set segments
+final Stream segments
 )
 {
   if (queryCacheKey == null) {
-return Collections.emptyList();
+return segments.map(s -> Pair.of(s, Optional.empty()));
   }
-  final List> alreadyCachedResults = 
Lists.newArrayList();
-  Map perSegmentCacheKeys = 
computePerSegmentCacheKeys(segments, queryCacheKey);
-  // Pull cached segments from cache and remove from set of segments to 
query
-  final Map cachedValues = 
computeCachedValues(perSegmentCacheKeys);
-
-  perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
-final Interval segmentQueryInterval = 
segment.getSegmentDescriptor().getInterval();
-
-final byte[] cachedValue = cachedValues.get(segmentCacheKey);
-if (cachedValue != null) {
-  // remove cached segment from set of segments to query
-  segments.remove(segment);
-  alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
-} else if (populateCache) {
-  // otherwise, if populating cache, add segment to list of segments 
to cache
-  final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
-  addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
-}
-  });
-  return alreadyCachedResults;
+  // We materialize the stream here in order to have the bulk cache 
fetching work as expected
+  final List> materializedKeyList = 
computePerSegmentCacheKeys(
+  segments,
+  queryCacheKey
+  ).collect(Collectors.toList());
+
+  // Do bulk fetch
+  final Map> cachedValues = 
computeCachedValues(materializedKeyList.stream())
+  .collect(Pair.mapCollector());
+
+  // A limitation of the cache system is that the cached values are 
returned without passing through the original
+  // objects. This hash join is a way to get the ServerToSegment and 
Optional matched up again
+  return materializedKeyList
+  .stream()
+  .map(serializedPairSegmentAndKey -> 
lookupInCache(serializedPairSegmentAndKey, cachedValues));
 }
 
-private Map computePerSegmentCacheKeys(
-Set segments,
+private Stream> 
computePerSegmentCacheKeys(
+Stream segments,
 byte[] queryCacheKey
 )
 {
-  // cacheKeys map must preserve segment ordering, in order for shards to 
always be combined in the same order
-  Map cacheKeys = Maps.newLinkedHashMap();
-  for (ServerToSegment serverToSegment : segments) {
-final Cache.NamedKey segmentCacheKey = 
CacheUtil.computeSegmentCacheKey(
-serverToSegment.getServer().getSegment().getIdentifier(),
-serverToSegment.getSegmentDescriptor(),
-queryCacheKey
-);
-cacheKeys.put(serverToSegment, segmentCacheKey);
-  }
-  return cacheKeys;
+  return segments
+  .map(serverToSegment -> {
+// cacheKeys 

[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213754836
 
 

 ##
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##
 @@ -242,74 +305,93 @@ public CachingClusteredClient(
 contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
 contextBuilder.put("bySegment", true);
   }
-  return contextBuilder.build();
+  return Collections.unmodifiableMap(contextBuilder);
 }
 
-Sequence run(final UnaryOperator> timelineConverter)
+Stream> run(final UnaryOperator> timelineConverter)
 {
   @Nullable
   TimelineLookup timeline = 
serverView.getTimeline(query.getDataSource());
   if (timeline == null) {
-return Sequences.empty();
+return Stream.empty();
   }
   timeline = timelineConverter.apply(timeline);
   if (uncoveredIntervalsLimit > 0) {
 computeUncoveredIntervals(timeline);
   }
 
-  final Set segments = computeSegmentsToQuery(timeline);
+  Stream segments = computeSegmentsToQuery(timeline);
   @Nullable
   final byte[] queryCacheKey = computeQueryCacheKey();
   if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
+// Materialize then re-stream
 
 Review comment:
   for `computeCurrentEtag`, added more comment context


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213754069
 
 

 ##
 File path: 
processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java
 ##
 @@ -70,87 +74,115 @@ public ChainedExecutionQueryRunner(
   QueryRunner... queryables
   )
   {
-this(exec, queryWatcher, Arrays.asList(queryables));
+this(exec, queryWatcher, Arrays.stream(queryables));
   }
 
   public ChainedExecutionQueryRunner(
   ExecutorService exec,
   QueryWatcher queryWatcher,
   Iterable> queryables
   )
+  {
+this(exec, queryWatcher, StreamSupport.stream(queryables.spliterator(), 
false));
+  }
+
+  public ChainedExecutionQueryRunner(
+  ExecutorService exec,
+  QueryWatcher queryWatcher,
+  Stream> queryables
+  )
   {
 // listeningDecorator will leave PrioritizedExecutorService unchanged,
 // since it already implements ListeningExecutorService
 this.exec = MoreExecutors.listeningDecorator(exec);
-this.queryables = Iterables.unmodifiableIterable(queryables);
 this.queryWatcher = queryWatcher;
+this.queryables = queryables;
   }
 
   @Override
   public Sequence run(final QueryPlus queryPlus, final Map responseContext)
   {
-Query query = queryPlus.getQuery();
+final Query query = queryPlus.getQuery();
 final int priority = QueryContexts.getPriority(query);
-final Ordering ordering = query.getResultOrdering();
+final Ordering ordering = query.getResultOrdering();
 final QueryPlus threadSafeQueryPlus = 
queryPlus.withoutThreadUnsafeState();
-return new BaseSequence>(
+return new BaseSequence<>(
 new BaseSequence.IteratorMaker>()
 {
   @Override
   public Iterator make()
   {
 // Make it a List<> to materialize all of the values (so that it 
will submit everything to the executor)
-ListenableFuture>> futures = Futures.allAsList(
-Lists.newArrayList(
-Iterables.transform(
-queryables,
-input -> {
-  if (input == null) {
-throw new ISE("Null queryRunner! Looks to be some 
segment unmapping action happening");
+final ListenableFuture>> futures = 
GuavaUtils.allFuturesAsList(
+queryables.peek(
 
 Review comment:
   Yes sounds good


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on issue #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on issue #5913: Move Caching Cluster Client to java streams 
and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#issuecomment-417022287
 
 
   @leventov / @jihoonson looks like some of the comments are unable to be 
responded to directly for some reason, maybe they are on a commit instead of 
the PR? I'll see if I can figure out what's goin on and get responses to them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213752655
 
 

 ##
 File path: common/src/test/java/io/druid/concurrent/ExecsTest.java
 ##
 @@ -55,6 +54,20 @@ public void testBlockingExecutorServiceThreeCapacity() 
throws Exception
 runTest(3);
   }
 
+  @Test
+  public void testNameFormatGood() throws Exception
+  {
+Execs.checkThreadNameFormat("good-%s");
+Execs.checkThreadNameFormat("good-%d");
+Execs.checkThreadNameFormat("whoops");
+  }
+
+  @Test(expected = IllegalFormatException.class)
+  public void testNameForamtBad() throws Exception
 
 Review comment:
   fixing


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges

2018-08-29 Thread GitBox
drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213752459
 
 

 ##
 File path: common/src/test/java/io/druid/concurrent/ExecsTest.java
 ##
 @@ -55,6 +54,20 @@ public void testBlockingExecutorServiceThreeCapacity() 
throws Exception
 runTest(3);
   }
 
+  @Test
+  public void testNameFormatGood() throws Exception
 
 Review comment:
   habit :) removing


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] a2l007 commented on a change in pull request #6263: Add ability to specify list of task ports

2018-08-29 Thread GitBox
a2l007 commented on a change in pull request #6263: Add ability to specify list 
of task ports
URL: https://github.com/apache/incubator-druid/pull/6263#discussion_r213748077
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
 ##
 @@ -27,16 +27,19 @@
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.net.SocketException;
+import java.util.List;
 import java.util.Set;
 
 public class PortFinder
 {
   private final Set usedPorts = Sets.newHashSet();
   private final int startPort;
+  private final List candidatePorts;
 
-  public PortFinder(int startPort)
+  public PortFinder(int startPort, List candidatePorts)
 
 Review comment:
   Instead of modifying the existing constructor, you could have a separate 
constructor with both the properties and to be used only when `candidatePorts` 
is non-empty.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] a2l007 commented on a change in pull request #6263: Add ability to specify list of task ports

2018-08-29 Thread GitBox
a2l007 commented on a change in pull request #6263: Add ability to specify list 
of task ports
URL: https://github.com/apache/incubator-druid/pull/6263#discussion_r213741762
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
 ##
 @@ -55,30 +58,54 @@ private static boolean canBind(int portNum)
 
   public synchronized int findUnusedPort()
   {
-int port = chooseNext(startPort);
-while (!canBind(port)) {
-  port = chooseNext(port + 1);
+if (candidatePorts != null && !candidatePorts.isEmpty()) {
+  int port = chooseFromCandidates();
+  usedPorts.add(port);
+  return port;
+} else {
+  int port = chooseNext(startPort);
+  while (!canBind(port)) {
+port = chooseNext(port + 1);
+  }
+  usedPorts.add(port);
+  return port;
 }
-usedPorts.add(port);
-return port;
   }
 
-  public synchronized Pair findTwoConsecutiveUnusedPorts()
+  public synchronized Pair findTwoUnusedPorts()
   {
-int firstPort = chooseNext(startPort);
-while (!canBind(firstPort) || !canBind(firstPort + 1)) {
-  firstPort = chooseNext(firstPort + 1);
+if (candidatePorts != null && !candidatePorts.isEmpty()) {
+  int firstPort = chooseFromCandidates();
+  int secondPort = chooseFromCandidates();
+  usedPorts.add(firstPort);
+  usedPorts.add(secondPort);
+  return new Pair<>(firstPort, secondPort);
+} else {
+  int firstPort = chooseNext(startPort);
+  while (!canBind(firstPort) || !canBind(firstPort + 1)) {
+firstPort = chooseNext(firstPort + 1);
+  }
+  usedPorts.add(firstPort);
+  usedPorts.add(firstPort + 1);
+  return new Pair<>(firstPort, firstPort + 1);
 }
-usedPorts.add(firstPort);
-usedPorts.add(firstPort + 1);
-return new Pair<>(firstPort, firstPort + 1);
   }
 
   public synchronized void markPortUnused(int port)
   {
 usedPorts.remove(port);
   }
 
+  private int chooseFromCandidates()
+  {
+for (int port : candidatePorts) {
+  if (!usedPorts.contains(port) && canBind(port)) {
+return port;
+  }
+}
+throw new ISE("All ports are Used..");
 
 Review comment:
   Instead of throwing an exception, I guess it would be better if 
`chooseNext()` is used to find a port in case all the candidate ports are 
unusable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] a2l007 commented on a change in pull request #6263: Add ability to specify list of task ports

2018-08-29 Thread GitBox
a2l007 commented on a change in pull request #6263: Add ability to specify list 
of task ports
URL: https://github.com/apache/incubator-druid/pull/6263#discussion_r213742999
 
 

 ##
 File path: docs/content/configuration/index.md
 ##
 @@ -1040,6 +1040,7 @@ Middle managers pass their configurations down to their 
child peons. The middle
 |`druid.indexer.runner.javaOptsArray`|A json array of strings to be passed in 
as options to the peon's jvm. This is additive to javaOpts and is recommended 
for properly handling arguments which contain quotes or spaces like 
`["-XX:OnOutOfMemoryError=kill -9 %p"]`|`[]`|
 |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can 
be created in Zookeeper.|524288|
 |`druid.indexer.runner.startPort`|Starting port used for peon processes, 
should be greater than 1023.|8100|
+|`druid.indexer.runner.ports`|A json array of integers to specify ports that 
used for peon processes. This property is an alternative to 
`druid.indexer.runner.startPort`. If specified and non-empty, ports for one 
peon process will be chosed from these ports rather than using 
`druid.indexer.runner.startPort` to allocate ports.|`[]`|
 
 Review comment:
   s/chosed/chosen


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] QiuMM opened a new pull request #6263: Add ability to specify list of task ports

2018-08-29 Thread GitBox
QiuMM opened a new pull request #6263: Add ability to specify list of task ports
URL: https://github.com/apache/incubator-druid/pull/6263
 
 
   Try to fix #6154.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] QiuMM closed pull request #6262: Add ability to specify list of task ports

2018-08-29 Thread GitBox
QiuMM closed pull request #6262: Add ability to specify list of task ports
URL: https://github.com/apache/incubator-druid/pull/6262
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/content/configuration/index.md 
b/docs/content/configuration/index.md
index 5f970c032ea..f959b89bd6b 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -1040,6 +1040,7 @@ Middle managers pass their configurations down to their 
child peons. The middle
 |`druid.indexer.runner.javaOptsArray`|A json array of strings to be passed in 
as options to the peon's jvm. This is additive to javaOpts and is recommended 
for properly handling arguments which contain quotes or spaces like 
`["-XX:OnOutOfMemoryError=kill -9 %p"]`|`[]`|
 |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can 
be created in Zookeeper.|524288|
 |`druid.indexer.runner.startPort`|Starting port used for peon processes, 
should be greater than 1023.|8100|
+|`druid.indexer.runner.ports`|A json array of integers to specify ports that 
used for peon processes. This property is an alternative to 
`druid.indexer.runner.startPort`. If specified and non-empty, ports for one 
peon process will be chosed from these ports rather than using 
`druid.indexer.runner.startPort` to allocate ports.|`[]`|
 |`druid.indexer.runner.separateIngestionEndpoint`|*Deprecated.* Use separate 
server and consequently separate jetty thread pool for ingesting events. Not 
supported with TLS.|false|
 |`druid.worker.ip`|The IP of the worker.|localhost|
 |`druid.worker.version`|Version identifier for the middle manager.|0|
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
 
b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
index cf4fcdb7c11..ca577c6b82b 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
@@ -128,7 +128,7 @@ public ForkingTaskRunner(
 this.taskLogPusher = taskLogPusher;
 this.jsonMapper = jsonMapper;
 this.node = node;
-this.portFinder = new PortFinder(config.getStartPort());
+this.portFinder = new PortFinder(config.getStartPort(), config.getPorts());
 this.exec = MoreExecutors.listeningDecorator(
 Execs.multiThreaded(workerConfig.getCapacity(), 
"forking-task-runner-%d")
 );
@@ -236,7 +236,7 @@ public TaskStatus call()
 
 if (node.isEnablePlaintextPort()) {
   if (config.isSeparateIngestionEndpoint()) {
-Pair portPair = 
portFinder.findTwoConsecutiveUnusedPorts();
+Pair portPair = 
portFinder.findTwoUnusedPorts();
 childPort = portPair.lhs;
 childChatHandlerPort = portPair.rhs;
   } else {
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java 
b/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
index 7a34ee49e8e..1eecf61da56 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
@@ -27,16 +27,19 @@
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.net.SocketException;
+import java.util.List;
 import java.util.Set;
 
 public class PortFinder
 {
   private final Set usedPorts = Sets.newHashSet();
   private final int startPort;
+  private final List candidatePorts;
 
-  public PortFinder(int startPort)
+  public PortFinder(int startPort, List candidatePorts)
   {
 this.startPort = startPort;
+this.candidatePorts = candidatePorts;
   }
 
   private static boolean canBind(int portNum)
@@ -55,23 +58,37 @@ private static boolean canBind(int portNum)
 
   public synchronized int findUnusedPort()
   {
-int port = chooseNext(startPort);
-while (!canBind(port)) {
-  port = chooseNext(port + 1);
+if (candidatePorts != null && !candidatePorts.isEmpty()) {
+  int port = chooseFromCandidates();
+  usedPorts.add(port);
+  return port;
+} else {
+  int port = chooseNext(startPort);
+  while (!canBind(port)) {
+port = chooseNext(port + 1);
+  }
+  usedPorts.add(port);
+  return port;
 }
-usedPorts.add(port);
-return port;
   }
 
-  public synchronized Pair findTwoConsecutiveUnusedPorts()
+  public synchronized Pair findTwoUnusedPorts()
   {
-int firstPort = chooseNext(startPort);
-while (!canBind(firstPort) || !canBind(firstPort + 1)) {
-  firstPort = chooseNext(firstPort + 1);
+if 

[GitHub] a2l007 commented on issue #6210: Csv with Header giving 'Success' but not loading any data

2018-08-29 Thread GitBox
a2l007 commented on issue #6210: Csv with Header giving 'Success' but not 
loading any data
URL: 
https://github.com/apache/incubator-druid/issues/6210#issuecomment-416975853
 
 
   Looks like an issue with your data not adhering to the csv format. As you 
can see from the logs, all the 640 rows were unparseable. Please double check 
your data and if you need further help, post a sample of the data that you're 
attempting to index.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] nishantmonu51 commented on a change in pull request #5957: Renamed 'Generic Column' -> 'Numeric Column'; Fixed a few resource leaks in processing; Fixed a bug in SingleStringInputDimensionSel

2018-08-29 Thread GitBox
nishantmonu51 commented on a change in pull request #5957: Renamed 'Generic 
Column' -> 'Numeric Column'; Fixed a few resource leaks in processing; Fixed a 
bug in SingleStringInputDimensionSelector; misc refinements
URL: https://github.com/apache/incubator-druid/pull/5957#discussion_r213655561
 
 

 ##
 File path: processing/src/main/java/io/druid/segment/StorageAdapter.java
 ##
 @@ -39,12 +39,8 @@
   Iterable getAvailableMetrics();
 
   /**
-   * Returns the number of distinct values for the given dimension column
-   * For dimensions of unknown cardinality, e.g. __time this currently returns
-   * Integer.MAX_VALUE
-   *
-   * @param column
-   * @return
+   * Returns the number of distinct values for the given column if known, or 
{@link Integer#MAX_VALUE} is unknown or
 
 Review comment:
   typo "is unknown" -> "if unknown"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] QiuMM opened a new pull request #6262: support specify list of task ports

2018-08-29 Thread GitBox
QiuMM opened a new pull request #6262: support specify list of task ports
URL: https://github.com/apache/incubator-druid/pull/6262
 
 
   Try to fix #6154.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] ranjan321 commented on issue #6210: Csv with Header giving 'Success' but not loading any data

2018-08-29 Thread GitBox
ranjan321 commented on issue #6210: Csv with Header giving 'Success' but not 
loading any data
URL: 
https://github.com/apache/incubator-druid/issues/6210#issuecomment-416881656
 
 
   Still i didn't get any response .please look into this issue as it an urgency


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] waixiaoyu commented on issue #6252: Deadlock may be in TaskMaster when stopping

2018-08-29 Thread GitBox
waixiaoyu commented on issue #6252: Deadlock may be in TaskMaster when stopping
URL: 
https://github.com/apache/incubator-druid/issues/6252#issuecomment-416838393
 
 
   @himanshug No, my zookeeper is used by other programs. So I just wanna 
shutdown Druid cluster, except Zookeeper.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] QiuMM commented on issue #6254: fix opentsdb emitter occupy 100%(#6247)

2018-08-29 Thread GitBox
QiuMM commented on issue #6254: fix opentsdb emitter occupy 100%(#6247)
URL: https://github.com/apache/incubator-druid/pull/6254#issuecomment-416835680
 
 
   I have fixed what you mentioned  @zhaojiandong.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] QiuMM commented on a change in pull request #6251: fix opentsdb emitter always be running

2018-08-29 Thread GitBox
QiuMM commented on a change in pull request #6251: fix opentsdb emitter always 
be running
URL: https://github.com/apache/incubator-druid/pull/6251#discussion_r213553407
 
 

 ##
 File path: docs/content/development/extensions-contrib/opentsdb-emitter.md
 ##
 @@ -18,10 +18,11 @@ All the configuration parameters for the opentsdb emitter 
are under `druid.emitt
 ||---|-|---|
 |`druid.emitter.opentsdb.host`|The host of the OpenTSDB server.|yes|none|
 |`druid.emitter.opentsdb.port`|The port of the OpenTSDB server.|yes|none|
-|`druid.emitter.opentsdb.connectionTimeout`|Connection timeout(in 
milliseconds).|no|2000|
-|`druid.emitter.opentsdb.readTimeout`|Read timeout(in milliseconds).|no|2000|  
+|`druid.emitter.opentsdb.connectionTimeout`|`Jersey client` connection 
timeout(in milliseconds).|no|2000|
+|`druid.emitter.opentsdb.readTimeout`|`Jersey client` read timeout(in 
milliseconds).|no|2000|
 |`druid.emitter.opentsdb.flushThreshold`|Queue flushing threshold.(Events will 
be sent as one batch)|no|100|
 |`druid.emitter.opentsdb.maxQueueSize`|Maximum size of the queue used to 
buffer events.|no|1000|
+|`druid.emitter.opentsdb.consumeDelay`|Queue consuming delay(in 
milliseconds).|no|1|
 
 Review comment:
   Done. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] QiuMM commented on a change in pull request #6251: fix opentsdb emitter always be running

2018-08-29 Thread GitBox
QiuMM commented on a change in pull request #6251: fix opentsdb emitter always 
be running
URL: https://github.com/apache/incubator-druid/pull/6251#discussion_r213552927
 
 

 ##
 File path: 
extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbSender.java
 ##
 @@ -109,13 +141,11 @@ private void sendEvents()
 @Override
 public void run()
 {
-  while (running) {
-if (!eventQueue.isEmpty()) {
-  OpentsdbEvent event = eventQueue.poll();
-  events.add(event);
-  if (events.size() >= flushThreshold) {
-sendEvents();
-  }
+  while (!eventQueue.isEmpty() && !scheduler.isShutdown()) {
+OpentsdbEvent event = eventQueue.poll();
+events.add(event);
+if (events.size() >= flushThreshold) {
+  sendEvents();
 
 Review comment:
   Actually, I used `scheduleWithFixedDelay` not `scheduleAtFixedRate` in my 
code. There may be two EventConsumer tasks at the same time only in the case of 
`flush` method being called. And I fixed it. Thanks for your reminder.



This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] QiuMM commented on a change in pull request #6251: fix opentsdb emitter always be running

2018-08-29 Thread GitBox
QiuMM commented on a change in pull request #6251: fix opentsdb emitter always 
be running
URL: https://github.com/apache/incubator-druid/pull/6251#discussion_r213552927
 
 

 ##
 File path: 
extensions-contrib/opentsdb-emitter/src/main/java/io/druid/emitter/opentsdb/OpentsdbSender.java
 ##
 @@ -109,13 +141,11 @@ private void sendEvents()
 @Override
 public void run()
 {
-  while (running) {
-if (!eventQueue.isEmpty()) {
-  OpentsdbEvent event = eventQueue.poll();
-  events.add(event);
-  if (events.size() >= flushThreshold) {
-sendEvents();
-  }
+  while (!eventQueue.isEmpty() && !scheduler.isShutdown()) {
+OpentsdbEvent event = eventQueue.poll();
+events.add(event);
+if (events.size() >= flushThreshold) {
+  sendEvents();
 
 Review comment:
   Actually, I used `scheduleWithFixedDelay` not `scheduleAtFixedRate` in my 
code. There may be two EventConsumer tasks at the same time only in the case of 
`flush` method being called. And I fixed it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org