[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177244#comment-17177244
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

cgivre commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-673636877


   @dbw9580 
   The `ClusterTest` class is supposed to start a Drill cluster so that you can 
execute queries.  You should not need to have a Drill cluster running for the 
unit tests to complete.  
   
   I think the reason this isn't doing what you're expecting is that in the 
`initIPFS` function in `IPFSTestSuit` you are creating a plugin with a null 
configuration and hence isn't initializing correctly.   
   
   I stepped through `testSimple()` with the debugger and the `dataset` object 
is `null`, hence the test fails.  My suspicion is that there is one small step 
being missed here.  Could you please take a look and step through this to make 
sure that Drill is being initialized correctly?
   Thanks
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add storage plugin for IPFS
> ---
>
> Key: DRILL-7745
> URL: https://issues.apache.org/jira/browse/DRILL-7745
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.18.0
>Reporter: Bowen Ding
>Assignee: Bowen Ding
>Priority: Major
>
> See introduction here: [https://github.com/bdchain/Minerva]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177232#comment-17177232
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r469944365



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private IPFSContext ipfsContext;
+  private IPFSScanSpec ipfsScanSpec;
+  private IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static long DEFAULT_NODE_SIZE = 1000l;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) 
throws IOException, ExecutionSetupException {
+this(
+((IPFSStoragePlugin) 
pluginRegistry.getPlugin(ipfsStoragePluginConfig)).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+   List columns) {
+super((String) null);
+this.ipfsContext = ipfsContext;
+this.ipfsScanSpec = ipfsScanSpec;
+   

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177213#comment-17177213
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

dbw9580 commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-673621711


   If I leave an instance of Drill running and then run the unit test 
(`TestIPFSQueries`), then it passes. I think the unit test does not actually 
build and  run a full Drill server, which is why the connections are rejected.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add storage plugin for IPFS
> ---
>
> Key: DRILL-7745
> URL: https://issues.apache.org/jira/browse/DRILL-7745
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.18.0
>Reporter: Bowen Ding
>Assignee: Bowen Ding
>Priority: Major
>
> See introduction here: [https://github.com/bdchain/Minerva]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177210#comment-17177210
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

dbw9580 commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-673615795


   > I'm still having issues actually getting the unit tests that require the 
IPFS daemon to actually execute.
   
   @cgivre Actually I am having trouble making that test run, too. I keep 
getting errors like "connection rejected: /127.0.0.1:31011" or "Protocol family 
unavailable: /0:0:0:0:0:0:0:1:31011". I can test successfully manually through 
the web ui with drill-embedded, though.
   
   Can you try testing through the web ui, too? The simple dataset should be 
easy to add to IPFS and test:
   
   ```bash
   ipfs object patch set-data $(ipfs object new) 
   ```
   
   This will return the hash of the simple dataset, which is 
`QmcbeavnEofA6NjG7vkpe1yLJo6En6ML4JnDooDn1BbKmR`.
   
   Then run a query through the web ui: ``select * from 
ipfs.`/ipfs/QmcbeavnEofA6NjG7vkpe1yLJo6En6ML4JnDooDn1BbKmR#json` `` .
   If the query takes too long to complete, try reducing the timeout values as 
well as the `max-peers-per-leaf` value in the plugin config.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add storage plugin for IPFS
> ---
>
> Key: DRILL-7745
> URL: https://issues.apache.org/jira/browse/DRILL-7745
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.18.0
>Reporter: Bowen Ding
>Assignee: Bowen Ding
>Priority: Major
>
> See introduction here: [https://github.com/bdchain/Minerva]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177171#comment-17177171
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470098319



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java
##
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.bouncycastle.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FETCH_DATA;
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+/**
+ * Helper class with some utilities that are specific to Drill with an IPFS 
storage
+ */
+public class IPFSHelper {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSHelper.class);
+
+  public static final String IPFS_NULL_OBJECT_HASH = 
"QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n";
+  public static final Multihash IPFS_NULL_OBJECT = 
Multihash.fromBase58(IPFS_NULL_OBJECT_HASH);
+
+  private ExecutorService executorService;
+  private final IPFS client;
+  private final IPFSCompat clientCompat;
+  private IPFSPeer myself;
+  private int maxPeersPerLeaf;
+  private Map timeouts;
+
+  public IPFSHelper(IPFS ipfs) {
+this.client = ipfs;
+this.clientCompat = new IPFSCompat(ipfs);
+  }
+
+  public IPFSHelper(IPFS ipfs, ExecutorService executorService) {
+this(ipfs);
+this.executorService = executorService;
+  }
+
+  public void setTimeouts(Map timeouts) {
+this.timeouts = timeouts;
+  }
+
+  public void setMyself(IPFSPeer myself) {
+this.myself = myself;
+  }
+
+  /**
+   * Set maximum number of providers per leaf node. The more providers, the 
more time it takes to do DHT queries, while
+   * it is more likely we can find an optimal peer.
+   * @param maxPeersPerLeaf max number of providers to search per leaf node
+   */
+  public void setMaxPeersPerLeaf(int maxPeersPerLeaf) {
+this.maxPeersPerLeaf = maxPeersPerLeaf;
+  }
+
+  public IPFS getClient() {
+return client;
+  }
+
+  public IPFSCompat getClientCompat() {
+return clientCompat;
+  }
+
+  public List findprovsTimeout(Multihash id) {
+List providers;
+providers = clientCompat.dht.findprovsListTimeout(id, maxPeersPerLeaf, 
timeouts.get(IPFSTimeOut.FIND_PROV), executorService);
+
+return 
providers.stream().map(Multihash::fromBase58).collect(Collectors.toList());
+  }
+
+  public List findpeerTimeout(Multihash peerId) {
+// trying to resolve addresses of a node itself will always hang
+// so we treat it specially
+if(peerId.equals(myself.getId())) {
+  return myself.getMultiAddresses();
+}
+
+List addrs;
+addrs = clientCompat.dht.findpeerListTimeout(peerId, 
timeouts.get(IPFSTimeOut.FIND_PEER_INFO), executorService);
+return addrs.stream()
+.filter(addr -> !addr.equals(""))
+.map(MultiAddress::new).collect(Collectors.toList());
+  }
+
+  public byte[] getObjectDataTimeout(Multihash object) throws IOException {
+return 

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177176#comment-17177176
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470098319



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java
##
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.bouncycastle.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FETCH_DATA;
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+/**
+ * Helper class with some utilities that are specific to Drill with an IPFS 
storage
+ */
+public class IPFSHelper {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSHelper.class);
+
+  public static final String IPFS_NULL_OBJECT_HASH = 
"QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n";
+  public static final Multihash IPFS_NULL_OBJECT = 
Multihash.fromBase58(IPFS_NULL_OBJECT_HASH);
+
+  private ExecutorService executorService;
+  private final IPFS client;
+  private final IPFSCompat clientCompat;
+  private IPFSPeer myself;
+  private int maxPeersPerLeaf;
+  private Map timeouts;
+
+  public IPFSHelper(IPFS ipfs) {
+this.client = ipfs;
+this.clientCompat = new IPFSCompat(ipfs);
+  }
+
+  public IPFSHelper(IPFS ipfs, ExecutorService executorService) {
+this(ipfs);
+this.executorService = executorService;
+  }
+
+  public void setTimeouts(Map timeouts) {
+this.timeouts = timeouts;
+  }
+
+  public void setMyself(IPFSPeer myself) {
+this.myself = myself;
+  }
+
+  /**
+   * Set maximum number of providers per leaf node. The more providers, the 
more time it takes to do DHT queries, while
+   * it is more likely we can find an optimal peer.
+   * @param maxPeersPerLeaf max number of providers to search per leaf node
+   */
+  public void setMaxPeersPerLeaf(int maxPeersPerLeaf) {
+this.maxPeersPerLeaf = maxPeersPerLeaf;
+  }
+
+  public IPFS getClient() {
+return client;
+  }
+
+  public IPFSCompat getClientCompat() {
+return clientCompat;
+  }
+
+  public List findprovsTimeout(Multihash id) {
+List providers;
+providers = clientCompat.dht.findprovsListTimeout(id, maxPeersPerLeaf, 
timeouts.get(IPFSTimeOut.FIND_PROV), executorService);
+
+return 
providers.stream().map(Multihash::fromBase58).collect(Collectors.toList());
+  }
+
+  public List findpeerTimeout(Multihash peerId) {
+// trying to resolve addresses of a node itself will always hang
+// so we treat it specially
+if(peerId.equals(myself.getId())) {
+  return myself.getMultiAddresses();
+}
+
+List addrs;
+addrs = clientCompat.dht.findpeerListTimeout(peerId, 
timeouts.get(IPFSTimeOut.FIND_PEER_INFO), executorService);
+return addrs.stream()
+.filter(addr -> !addr.equals(""))
+.map(MultiAddress::new).collect(Collectors.toList());
+  }
+
+  public byte[] getObjectDataTimeout(Multihash object) throws IOException {
+return 

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177161#comment-17177161
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470091789



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanSpec.java
##
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.InvalidParameterException;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+@JsonTypeName("IPFSScanSpec")
+public class IPFSScanSpec {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSScanSpec.class);
+
+  public enum Prefix {
+@JsonProperty("ipfs")
+IPFS("ipfs"),
+@JsonProperty("ipns")
+IPNS("ipns");
+
+@JsonProperty("prefix")
+private final String name;
+Prefix(String prefix) {
+  this.name = prefix;
+}
+
+@Override
+public String toString() {
+  return this.name;
+}
+
+@JsonCreator
+public static Prefix of(String what) {
+  switch (what) {
+case "ipfs" :
+  return IPFS;
+case "ipns":
+  return IPNS;
+default:
+  throw new InvalidParameterException("Unsupported prefix: " + what);
+  }
+}
+  }
+
+  public enum Format {
+@JsonProperty("json")
+JSON("json"),
+@JsonProperty("csv")
+CSV("csv");
+
+@JsonProperty("format")
+private final String name;
+Format(String prefix) {
+  this.name = prefix;
+}
+
+@Override
+public String toString() {
+  return this.name;
+}
+
+@JsonCreator
+public static Format of(String what) {
+  switch (what) {
+case "json" :
+  return JSON;
+case "csv":
+  return CSV;
+default:
+  throw new InvalidParameterException("Unsupported format: " + what);
+  }
+}
+  }
+
+  public static Set formats = ImmutableSet.of("json", "csv");
+  private Prefix prefix;
+  private String path;
+  private Format formatExtension;
+  private final IPFSContext ipfsContext;
+
+  @JsonCreator
+  public IPFSScanSpec (@JacksonInject StoragePluginRegistry registry,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("prefix") Prefix prefix,
+   @JsonProperty("format") Format format,
+   @JsonProperty("path") String path) {
+this.ipfsContext = registry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext();
+this.prefix = prefix;
+this.formatExtension = format;
+this.path = path;
+  }
+
+  public IPFSScanSpec (IPFSContext ipfsContext, String path) {
+this.ipfsContext = ipfsContext;
+parsePath(path);
+  }
+
+  private void parsePath(String path) {
+//FIXME: IPFS hashes are actually Base58 encoded, so "0" "O" "I" "l" are 
not valid
+//also CIDs can be encoded with different encodings, not necessarily Base58
+Pattern tableNamePattern = 
Pattern.compile("^/(ipfs|ipns)/([A-Za-z0-9]{46}(/[^#]+)*)(?:#(\\w+))?$");
+Matcher matcher = tableNamePattern.matcher(path);
+if (!matcher.matches()) {
+  throw UserException.validationError().message("Invalid IPFS path in 
query string. Use paths of pattern `/scheme/hashpath#format`, where scheme:= 

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177158#comment-17177158
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470091514



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanSpec.java
##
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.InvalidParameterException;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+@JsonTypeName("IPFSScanSpec")
+public class IPFSScanSpec {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSScanSpec.class);
+
+  public enum Prefix {
+@JsonProperty("ipfs")
+IPFS("ipfs"),
+@JsonProperty("ipns")
+IPNS("ipns");
+
+@JsonProperty("prefix")
+private final String name;
+Prefix(String prefix) {
+  this.name = prefix;
+}
+
+@Override
+public String toString() {
+  return this.name;
+}
+
+@JsonCreator
+public static Prefix of(String what) {
+  switch (what) {
+case "ipfs" :
+  return IPFS;
+case "ipns":
+  return IPNS;
+default:
+  throw new InvalidParameterException("Unsupported prefix: " + what);
+  }
+}
+  }
+
+  public enum Format {
+@JsonProperty("json")
+JSON("json"),
+@JsonProperty("csv")
+CSV("csv");
+
+@JsonProperty("format")
+private final String name;
+Format(String prefix) {
+  this.name = prefix;
+}
+
+@Override
+public String toString() {
+  return this.name;
+}
+
+@JsonCreator
+public static Format of(String what) {
+  switch (what) {
+case "json" :
+  return JSON;
+case "csv":
+  return CSV;
+default:
+  throw new InvalidParameterException("Unsupported format: " + what);
+  }
+}
+  }
+
+  public static Set formats = ImmutableSet.of("json", "csv");
+  private Prefix prefix;
+  private String path;
+  private Format formatExtension;
+  private final IPFSContext ipfsContext;
+
+  @JsonCreator
+  public IPFSScanSpec (@JacksonInject StoragePluginRegistry registry,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("prefix") Prefix prefix,
+   @JsonProperty("format") Format format,
+   @JsonProperty("path") String path) {
+this.ipfsContext = registry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext();
+this.prefix = prefix;
+this.formatExtension = format;
+this.path = path;
+  }
+
+  public IPFSScanSpec (IPFSContext ipfsContext, String path) {
+this.ipfsContext = ipfsContext;
+parsePath(path);
+  }
+
+  private void parsePath(String path) {
+//FIXME: IPFS hashes are actually Base58 encoded, so "0" "O" "I" "l" are 
not valid

Review comment:
   Again, please either remove, or include a reference to a JIRA to 
document what needs to be done. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177154#comment-17177154
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470089240



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ */
+public class IPFSCompat {
+  public final String host;
+  public final int port;
+  private final String version;
+  public final String protocol;
+  public final int readTimeout;
+  public static final int DEFAULT_READ_TIMEOUT = 0;
+
+  public final DHT dht = new DHT();
+  public final Name name = new Name();
+
+  public IPFSCompat(IPFS ipfs) {
+this(ipfs.host, ipfs.port);
+  }
+
+  public IPFSCompat(String host, int port) {
+this(host, port, "/api/v0/", false, DEFAULT_READ_TIMEOUT);
+  }
+
+  public IPFSCompat(String host, int port, String version, boolean ssl, int 
readTimeout) {
+this.host = host;
+this.port = port;
+
+if(ssl) {
+  this.protocol = "https";
+} else {
+  this.protocol = "http";
+}
+
+this.version = version;
+this.readTimeout = readTimeout;
+  }
+
+  /**
+   * Resolve names to IPFS CIDs.
+   * See https://docs.ipfs.io/reference/http/api/#api-v0-resolve;>resolve in IPFS 
doc.
+   * @param scheme the scheme of the name to resolve, usually IPFS or IPNS
+   * @param path the path to the object
+   * @param recursive whether recursively resolve names until it is a IPFS CID
+   * @return a Map of JSON object, with the result as the value of key "Path"
+   */
+  public Map resolve(String scheme, String path, boolean recursive) {
+AtomicReference ret = new AtomicReference<>();
+getObjectStream(
+"resolve?arg=/" + scheme+"/"+path +"="+recursive,
+res -> {
+  ret.set((Map) res);
+  return true;
+},
+err -> {
+  throw new RuntimeException(err);
+}
+);
+return ret.get();
+  }
+
+  /**
+   * As defined in 
https://github.com/libp2p/go-libp2p-core/blob/b77fd280f2bfcce22f10a000e8e1d9ec53c47049/routing/query.go#L16
+   */
+  public enum DHTQueryEventType {
+// Sending a query to a peer.
+SendingQuery,
+// Got a response from a peer.
+PeerResponse,
+// Found a "closest" peer (not currently used).
+FinalPeer,
+// Got an error when querying.
+QueryError,
+// Found a provider.
+Provider,
+// Found a value.
+Value,

Review comment:
   No. I included them for sake of completeness. Should they be removed?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add storage plugin for IPFS
> ---
>
> Key: DRILL-7745
> URL: https://issues.apache.org/jira/browse/DRILL-7745
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
> 

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177153#comment-17177153
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470088211



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java
##
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.bouncycastle.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FETCH_DATA;
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+/**
+ * Helper class with some utilities that are specific to Drill with an IPFS 
storage
+ */
+public class IPFSHelper {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSHelper.class);
+
+  public static final String IPFS_NULL_OBJECT_HASH = 
"QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n";
+  public static final Multihash IPFS_NULL_OBJECT = 
Multihash.fromBase58(IPFS_NULL_OBJECT_HASH);
+
+  private ExecutorService executorService;
+  private final IPFS client;
+  private final IPFSCompat clientCompat;
+  private IPFSPeer myself;
+  private int maxPeersPerLeaf;
+  private Map timeouts;
+
+  public IPFSHelper(IPFS ipfs) {
+this.client = ipfs;
+this.clientCompat = new IPFSCompat(ipfs);
+  }
+
+  public IPFSHelper(IPFS ipfs, ExecutorService executorService) {
+this(ipfs);
+this.executorService = executorService;
+  }
+
+  public void setTimeouts(Map timeouts) {
+this.timeouts = timeouts;
+  }
+
+  public void setMyself(IPFSPeer myself) {
+this.myself = myself;
+  }
+
+  /**
+   * Set maximum number of providers per leaf node. The more providers, the 
more time it takes to do DHT queries, while
+   * it is more likely we can find an optimal peer.
+   * @param maxPeersPerLeaf max number of providers to search per leaf node
+   */
+  public void setMaxPeersPerLeaf(int maxPeersPerLeaf) {
+this.maxPeersPerLeaf = maxPeersPerLeaf;
+  }
+
+  public IPFS getClient() {
+return client;
+  }
+
+  public IPFSCompat getClientCompat() {
+return clientCompat;
+  }
+
+  public List findprovsTimeout(Multihash id) {
+List providers;
+providers = clientCompat.dht.findprovsListTimeout(id, maxPeersPerLeaf, 
timeouts.get(IPFSTimeOut.FIND_PROV), executorService);
+
+return 
providers.stream().map(Multihash::fromBase58).collect(Collectors.toList());
+  }
+
+  public List findpeerTimeout(Multihash peerId) {
+// trying to resolve addresses of a node itself will always hang
+// so we treat it specially
+if(peerId.equals(myself.getId())) {
+  return myself.getMultiAddresses();
+}
+
+List addrs;
+addrs = clientCompat.dht.findpeerListTimeout(peerId, 
timeouts.get(IPFSTimeOut.FIND_PEER_INFO), executorService);
+return addrs.stream()
+.filter(addr -> !addr.equals(""))
+.map(MultiAddress::new).collect(Collectors.toList());
+  }
+
+  public byte[] getObjectDataTimeout(Multihash object) throws IOException {
+return 

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177152#comment-17177152
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470087869



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java
##
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.bouncycastle.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FETCH_DATA;
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+/**
+ * Helper class with some utilities that are specific to Drill with an IPFS 
storage
+ */
+public class IPFSHelper {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSHelper.class);
+
+  public static final String IPFS_NULL_OBJECT_HASH = 
"QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n";
+  public static final Multihash IPFS_NULL_OBJECT = 
Multihash.fromBase58(IPFS_NULL_OBJECT_HASH);
+
+  private ExecutorService executorService;
+  private final IPFS client;
+  private final IPFSCompat clientCompat;
+  private IPFSPeer myself;
+  private int maxPeersPerLeaf;
+  private Map timeouts;
+
+  public IPFSHelper(IPFS ipfs) {
+this.client = ipfs;
+this.clientCompat = new IPFSCompat(ipfs);
+  }
+
+  public IPFSHelper(IPFS ipfs, ExecutorService executorService) {
+this(ipfs);
+this.executorService = executorService;
+  }
+
+  public void setTimeouts(Map timeouts) {
+this.timeouts = timeouts;
+  }
+
+  public void setMyself(IPFSPeer myself) {
+this.myself = myself;
+  }
+
+  /**
+   * Set maximum number of providers per leaf node. The more providers, the 
more time it takes to do DHT queries, while
+   * it is more likely we can find an optimal peer.
+   * @param maxPeersPerLeaf max number of providers to search per leaf node
+   */
+  public void setMaxPeersPerLeaf(int maxPeersPerLeaf) {
+this.maxPeersPerLeaf = maxPeersPerLeaf;
+  }
+
+  public IPFS getClient() {
+return client;
+  }
+
+  public IPFSCompat getClientCompat() {
+return clientCompat;
+  }
+
+  public List findprovsTimeout(Multihash id) {
+List providers;
+providers = clientCompat.dht.findprovsListTimeout(id, maxPeersPerLeaf, 
timeouts.get(IPFSTimeOut.FIND_PROV), executorService);
+
+return 
providers.stream().map(Multihash::fromBase58).collect(Collectors.toList());
+  }
+
+  public List findpeerTimeout(Multihash peerId) {
+// trying to resolve addresses of a node itself will always hang
+// so we treat it specially
+if(peerId.equals(myself.getId())) {
+  return myself.getMultiAddresses();
+}
+
+List addrs;
+addrs = clientCompat.dht.findpeerListTimeout(peerId, 
timeouts.get(IPFSTimeOut.FIND_PEER_INFO), executorService);
+return addrs.stream()
+.filter(addr -> !addr.equals(""))
+.map(MultiAddress::new).collect(Collectors.toList());
+  }
+
+  public byte[] getObjectDataTimeout(Multihash object) throws IOException {
+return 

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177150#comment-17177150
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470087561



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java
##
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.bouncycastle.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FETCH_DATA;
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+/**
+ * Helper class with some utilities that are specific to Drill with an IPFS 
storage
+ */
+public class IPFSHelper {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSHelper.class);
+
+  public static final String IPFS_NULL_OBJECT_HASH = 
"QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n";
+  public static final Multihash IPFS_NULL_OBJECT = 
Multihash.fromBase58(IPFS_NULL_OBJECT_HASH);
+
+  private ExecutorService executorService;
+  private final IPFS client;
+  private final IPFSCompat clientCompat;
+  private IPFSPeer myself;
+  private int maxPeersPerLeaf;
+  private Map timeouts;
+
+  public IPFSHelper(IPFS ipfs) {
+this.client = ipfs;
+this.clientCompat = new IPFSCompat(ipfs);
+  }
+
+  public IPFSHelper(IPFS ipfs, ExecutorService executorService) {
+this(ipfs);
+this.executorService = executorService;
+  }
+
+  public void setTimeouts(Map timeouts) {
+this.timeouts = timeouts;
+  }
+
+  public void setMyself(IPFSPeer myself) {
+this.myself = myself;
+  }
+
+  /**
+   * Set maximum number of providers per leaf node. The more providers, the 
more time it takes to do DHT queries, while
+   * it is more likely we can find an optimal peer.
+   * @param maxPeersPerLeaf max number of providers to search per leaf node
+   */
+  public void setMaxPeersPerLeaf(int maxPeersPerLeaf) {
+this.maxPeersPerLeaf = maxPeersPerLeaf;
+  }
+
+  public IPFS getClient() {
+return client;
+  }
+
+  public IPFSCompat getClientCompat() {
+return clientCompat;
+  }
+
+  public List findprovsTimeout(Multihash id) {
+List providers;
+providers = clientCompat.dht.findprovsListTimeout(id, maxPeersPerLeaf, 
timeouts.get(IPFSTimeOut.FIND_PROV), executorService);
+
+return 
providers.stream().map(Multihash::fromBase58).collect(Collectors.toList());
+  }
+
+  public List findpeerTimeout(Multihash peerId) {
+// trying to resolve addresses of a node itself will always hang
+// so we treat it specially
+if(peerId.equals(myself.getId())) {
+  return myself.getMultiAddresses();
+}
+
+List addrs;
+addrs = clientCompat.dht.findpeerListTimeout(peerId, 
timeouts.get(IPFSTimeOut.FIND_PEER_INFO), executorService);
+return addrs.stream()
+.filter(addr -> !addr.equals(""))
+.map(MultiAddress::new).collect(Collectors.toList());
+  }
+
+  public byte[] getObjectDataTimeout(Multihash object) throws IOException {
+return 

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177146#comment-17177146
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470086686



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private final IPFSContext ipfsContext;
+  private final IPFSScanSpec ipfsScanSpec;
+  private final IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static final long DEFAULT_NODE_SIZE = 1000L;
+  private static final int DEFAULT_USER_PORT = 31010;
+  private static final int DEFAULT_CONTROL_PORT = 31011;
+  private static final int DEFAULT_DATA_PORT = 31012;
+  private static final int DEFAULT_HTTP_PORT = 8047;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(
+pluginRegistry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+ 

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177145#comment-17177145
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470085822



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private final IPFSContext ipfsContext;
+  private final IPFSScanSpec ipfsScanSpec;
+  private final IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static final long DEFAULT_NODE_SIZE = 1000L;
+  private static final int DEFAULT_USER_PORT = 31010;
+  private static final int DEFAULT_CONTROL_PORT = 31011;
+  private static final int DEFAULT_DATA_PORT = 31012;
+  private static final int DEFAULT_HTTP_PORT = 8047;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(
+pluginRegistry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+ 

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177142#comment-17177142
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470084386



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private final IPFSContext ipfsContext;
+  private final IPFSScanSpec ipfsScanSpec;
+  private final IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static final long DEFAULT_NODE_SIZE = 1000L;
+  private static final int DEFAULT_USER_PORT = 31010;
+  private static final int DEFAULT_CONTROL_PORT = 31011;
+  private static final int DEFAULT_DATA_PORT = 31012;
+  private static final int DEFAULT_HTTP_PORT = 8047;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(
+pluginRegistry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+ 

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177141#comment-17177141
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470083089



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private final IPFSContext ipfsContext;
+  private final IPFSScanSpec ipfsScanSpec;
+  private final IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static final long DEFAULT_NODE_SIZE = 1000L;
+  private static final int DEFAULT_USER_PORT = 31010;
+  private static final int DEFAULT_CONTROL_PORT = 31011;
+  private static final int DEFAULT_DATA_PORT = 31012;
+  private static final int DEFAULT_HTTP_PORT = 8047;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(
+pluginRegistry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+ 

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177140#comment-17177140
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470082144



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ */
+public class IPFSCompat {
+  public final String host;
+  public final int port;
+  private final String version;
+  public final String protocol;
+  public final int readTimeout;
+  public static final int DEFAULT_READ_TIMEOUT = 0;
+
+  public final DHT dht = new DHT();
+  public final Name name = new Name();
+
+  public IPFSCompat(IPFS ipfs) {
+this(ipfs.host, ipfs.port);
+  }
+
+  public IPFSCompat(String host, int port) {
+this(host, port, "/api/v0/", false, DEFAULT_READ_TIMEOUT);
+  }
+
+  public IPFSCompat(String host, int port, String version, boolean ssl, int 
readTimeout) {
+this.host = host;
+this.port = port;
+
+if(ssl) {
+  this.protocol = "https";
+} else {
+  this.protocol = "http";
+}
+
+this.version = version;
+this.readTimeout = readTimeout;
+  }
+
+  /**
+   * Resolve names to IPFS CIDs.
+   * See https://docs.ipfs.io/reference/http/api/#api-v0-resolve;>resolve in IPFS 
doc.
+   * @param scheme the scheme of the name to resolve, usually IPFS or IPNS
+   * @param path the path to the object
+   * @param recursive whether recursively resolve names until it is a IPFS CID
+   * @return a Map of JSON object, with the result as the value of key "Path"
+   */
+  public Map resolve(String scheme, String path, boolean recursive) {
+AtomicReference ret = new AtomicReference<>();
+getObjectStream(
+"resolve?arg=/" + scheme+"/"+path +"="+recursive,
+res -> {
+  ret.set((Map) res);
+  return true;
+},
+err -> {
+  throw new RuntimeException(err);
+}
+);
+return ret.get();
+  }
+
+  /**
+   * As defined in 
https://github.com/libp2p/go-libp2p-core/blob/b77fd280f2bfcce22f10a000e8e1d9ec53c47049/routing/query.go#L16
+   */
+  public enum DHTQueryEventType {
+// Sending a query to a peer.
+SendingQuery,
+// Got a response from a peer.
+PeerResponse,
+// Found a "closest" peer (not currently used).
+FinalPeer,
+// Got an error when querying.
+QueryError,
+// Found a provider.
+Provider,
+// Found a value.
+Value,

Review comment:
   Are `Value`, `AddingPeer` and `DialingPeer` ever used?  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add storage plugin for IPFS
> ---
>
> Key: DRILL-7745
> URL: https://issues.apache.org/jira/browse/DRILL-7745
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects 

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177138#comment-17177138
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470081506



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ */
+public class IPFSCompat {
+  public final String host;
+  public final int port;
+  private final String version;
+  public final String protocol;
+  public final int readTimeout;
+  public static final int DEFAULT_READ_TIMEOUT = 0;
+
+  public final DHT dht = new DHT();
+  public final Name name = new Name();
+
+  public IPFSCompat(IPFS ipfs) {
+this(ipfs.host, ipfs.port);
+  }
+
+  public IPFSCompat(String host, int port) {
+this(host, port, "/api/v0/", false, DEFAULT_READ_TIMEOUT);
+  }
+
+  public IPFSCompat(String host, int port, String version, boolean ssl, int 
readTimeout) {
+this.host = host;
+this.port = port;
+
+if(ssl) {
+  this.protocol = "https";
+} else {
+  this.protocol = "http";
+}
+
+this.version = version;
+this.readTimeout = readTimeout;
+  }
+
+  /**
+   * Resolve names to IPFS CIDs.
+   * See https://docs.ipfs.io/reference/http/api/#api-v0-resolve;>resolve in IPFS 
doc.
+   * @param scheme the scheme of the name to resolve, usually IPFS or IPNS
+   * @param path the path to the object
+   * @param recursive whether recursively resolve names until it is a IPFS CID
+   * @return a Map of JSON object, with the result as the value of key "Path"
+   */
+  public Map resolve(String scheme, String path, boolean recursive) {
+AtomicReference ret = new AtomicReference<>();
+getObjectStream(
+"resolve?arg=/" + scheme+"/"+path +"="+recursive,
+res -> {
+  ret.set((Map) res);
+  return true;
+},
+err -> {
+  throw new RuntimeException(err);
+}
+);
+return ret.get();
+  }
+
+  /**
+   * As defined in 
https://github.com/libp2p/go-libp2p-core/blob/b77fd280f2bfcce22f10a000e8e1d9ec53c47049/routing/query.go#L16
+   */
+  public enum DHTQueryEventType {
+// Sending a query to a peer.
+SendingQuery,
+// Got a response from a peer.
+PeerResponse,
+// Found a "closest" peer (not currently used).
+FinalPeer,
+// Got an error when querying.
+QueryError,
+// Found a provider.
+Provider,
+// Found a value.
+Value,
+// Adding a peer to the query.
+AddingPeer,
+// Dialing a peer.
+DialingPeer;
+  }
+
+  public class DHT {
+/**
+ * Find internet addresses of a given peer.
+ * See https://docs.ipfs.io/reference/http/api/#api-v0-dht-findpeer;>dht/findpeer
 in IPFS doc.
+ * @param id the id of the peer to query
+ * @param timeout timeout value in seconds
+ * @param executor executor
+ * @return List of Multiaddresses of the peer
+ */
+public List findpeerListTimeout(Multihash id, int timeout, 
ExecutorService executor) {
+  AtomicReference> ret = new AtomicReference<>();
+  timeLimitedExec(
+  "name/resolve?arg=" + id,
+  

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177137#comment-17177137
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470080197



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ *
+ * Supports IPFS up to version v0.4.23, due to new restrictions enforcing all 
API calls to be made with POST method.
+ * Upstream issue tracker: 
https://github.com/ipfs-shipyard/java-ipfs-http-client/issues/157
+ */

Review comment:
   I saw this PR 
(https://github.com/ipfs-shipyard/java-ipfs-http-client/pull/172) was merged!  
Can we:
   1.  Once there is a release with this PR merged, update the `pom.xml` so 
that we are using the "official" library.
   
   Should this now work will all versions of IPFS?  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add storage plugin for IPFS
> ---
>
> Key: DRILL-7745
> URL: https://issues.apache.org/jira/browse/DRILL-7745
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.18.0
>Reporter: Bowen Ding
>Assignee: Bowen Ding
>Priority: Major
>
> See introduction here: [https://github.com/bdchain/Minerva]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177105#comment-17177105
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

cgivre commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-673552669


   @dbw9580 
   I redownloaded and it built for me.  Please disregard previous comments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add storage plugin for IPFS
> ---
>
> Key: DRILL-7745
> URL: https://issues.apache.org/jira/browse/DRILL-7745
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.18.0
>Reporter: Bowen Ding
>Assignee: Bowen Ding
>Priority: Major
>
> See introduction here: [https://github.com/bdchain/Minerva]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177103#comment-17177103
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

dbw9580 commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-673543986


   > `TestIPFQueries` fails the checkstyle due to unused imports.
   @cgivre Hmm I don't see any unused imports in this file and my builds are 
passing.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add storage plugin for IPFS
> ---
>
> Key: DRILL-7745
> URL: https://issues.apache.org/jira/browse/DRILL-7745
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.18.0
>Reporter: Bowen Ding
>Assignee: Bowen Ding
>Priority: Major
>
> See introduction here: [https://github.com/bdchain/Minerva]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177102#comment-17177102
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470035068



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private IPFSContext ipfsContext;
+  private IPFSScanSpec ipfsScanSpec;
+  private IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static long DEFAULT_NODE_SIZE = 1000l;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) 
throws IOException, ExecutionSetupException {
+this(
+((IPFSStoragePlugin) 
pluginRegistry.getPlugin(ipfsStoragePluginConfig)).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+   List columns) {
+super((String) null);
+this.ipfsContext = ipfsContext;
+this.ipfsScanSpec = ipfsScanSpec;
+   

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177091#comment-17177091
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470025016



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ *
+ * Supports IPFS up to version v0.4.23, due to new restrictions enforcing all 
API calls to be made with POST method.
+ * Upstream issue tracker: 
https://github.com/ipfs-shipyard/java-ipfs-http-client/issues/157
+ */
+public class IPFSCompat {
+  public final String host;
+  public final int port;
+  private final String version;
+  public final String protocol;
+  public final int readTimeout;
+  public static final int DEFAULT_READ_TIMEOUT = 0;
+
+  public final DHT dht = new DHT();
+  public final Name name = new Name();
+
+  public IPFSCompat(IPFS ipfs) {
+this(ipfs.host, ipfs.port);
+  }
+
+  public IPFSCompat(String host, int port) {
+this(host, port, "/api/v0/", false, DEFAULT_READ_TIMEOUT);
+  }
+
+  public IPFSCompat(String host, int port, String version, boolean ssl, int 
readTimeout) {
+this.host = host;
+this.port = port;
+
+if(ssl) {
+  this.protocol = "https";
+} else {
+  this.protocol = "http";
+}
+
+this.version = version;
+this.readTimeout = readTimeout;
+  }
+
+  /**
+   * Resolve names to IPFS CIDs.
+   * See https://docs.ipfs.io/reference/http/api/#api-v0-resolve;>resolve in IPFS 
doc.
+   * @param scheme the scheme of the name to resolve, usually IPFS or IPNS
+   * @param path the path to the object
+   * @param recursive whether recursively resolve names until it is a IPFS CID
+   * @return a Map of JSON object, with the result as the value of key "Path"
+   */
+  public Map resolve(String scheme, String path, boolean recursive) {
+AtomicReference ret = new AtomicReference<>();
+getObjectStream(
+"resolve?arg=/" + scheme+"/"+path +"="+recursive,
+res -> {
+  ret.set((Map) res);
+  return true;
+},
+err -> {
+  throw new RuntimeException(err);
+}
+);
+return ret.get();
+  }
+
+  public class DHT {
+/**
+ * Find internet addresses of a given peer.
+ * See https://docs.ipfs.io/reference/http/api/#api-v0-dht-findpeer;>dht/findpeer
 in IPFS doc.
+ * @param id the id of the peer to query
+ * @param timeout timeout value in seconds
+ * @param executor executor
+ * @return List of Multiaddresses of the peer
+ */
+public List findpeerListTimeout(Multihash id, int timeout, 
ExecutorService executor) {
+  AtomicReference> ret = new AtomicReference<>();
+  timeLimitedExec(
+  "name/resolve?arg=" + id,
+  timeout,
+  res -> {
+Map peer = (Map) res;

Review comment:
   Made some changes in 39bab37.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this 

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177026#comment-17177026
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r469960924



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ *
+ * Supports IPFS up to version v0.4.23, due to new restrictions enforcing all 
API calls to be made with POST method.
+ * Upstream issue tracker: 
https://github.com/ipfs-shipyard/java-ipfs-http-client/issues/157
+ */
+public class IPFSCompat {
+  public final String host;
+  public final int port;
+  private final String version;
+  public final String protocol;
+  public final int readTimeout;
+  public static final int DEFAULT_READ_TIMEOUT = 0;
+
+  public final DHT dht = new DHT();
+  public final Name name = new Name();
+
+  public IPFSCompat(IPFS ipfs) {
+this(ipfs.host, ipfs.port);
+  }
+
+  public IPFSCompat(String host, int port) {
+this(host, port, "/api/v0/", false, DEFAULT_READ_TIMEOUT);
+  }
+
+  public IPFSCompat(String host, int port, String version, boolean ssl, int 
readTimeout) {
+this.host = host;
+this.port = port;
+
+if(ssl) {
+  this.protocol = "https";
+} else {
+  this.protocol = "http";
+}
+
+this.version = version;
+this.readTimeout = readTimeout;
+  }
+
+  /**
+   * Resolve names to IPFS CIDs.
+   * See https://docs.ipfs.io/reference/http/api/#api-v0-resolve;>resolve in IPFS 
doc.
+   * @param scheme the scheme of the name to resolve, usually IPFS or IPNS
+   * @param path the path to the object
+   * @param recursive whether recursively resolve names until it is a IPFS CID
+   * @return a Map of JSON object, with the result as the value of key "Path"
+   */
+  public Map resolve(String scheme, String path, boolean recursive) {
+AtomicReference ret = new AtomicReference<>();
+getObjectStream(
+"resolve?arg=/" + scheme+"/"+path +"="+recursive,
+res -> {
+  ret.set((Map) res);
+  return true;
+},
+err -> {
+  throw new RuntimeException(err);
+}
+);
+return ret.get();
+  }
+
+  public class DHT {
+/**
+ * Find internet addresses of a given peer.
+ * See https://docs.ipfs.io/reference/http/api/#api-v0-dht-findpeer;>dht/findpeer
 in IPFS doc.
+ * @param id the id of the peer to query
+ * @param timeout timeout value in seconds
+ * @param executor executor
+ * @return List of Multiaddresses of the peer
+ */
+public List findpeerListTimeout(Multihash id, int timeout, 
ExecutorService executor) {
+  AtomicReference> ret = new AtomicReference<>();
+  timeLimitedExec(
+  "name/resolve?arg=" + id,
+  timeout,
+  res -> {
+Map peer = (Map) res;

Review comment:
   I think it's unnecessary to specify all the type parameters. These 
`Map`s are JSON responses from the IPFS daemon, and can be deeply nested. It 
would be best handled by a library to properly define the types and structures 
of these responses, e.g. via DAOs, but the 

[jira] [Commented] (DRILL-7745) Add storage plugin for IPFS

2020-08-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177004#comment-17177004
 ] 

ASF GitHub Bot commented on DRILL-7745:
---

dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r469944365



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private IPFSContext ipfsContext;
+  private IPFSScanSpec ipfsScanSpec;
+  private IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static long DEFAULT_NODE_SIZE = 1000l;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) 
throws IOException, ExecutionSetupException {
+this(
+((IPFSStoragePlugin) 
pluginRegistry.getPlugin(ipfsStoragePluginConfig)).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+   List columns) {
+super((String) null);
+this.ipfsContext = ipfsContext;
+this.ipfsScanSpec = ipfsScanSpec;
+