[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15445489#comment-15445489
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user clharris closed the pull request at:

https://github.com/apache/incubator-pirk/pull/74


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15441379#comment-15441379
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r76515127
  
--- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bolt class to perform encrypted column multiplication
+ * 
+ * Takes {@code } tuples as input and aggregates 
(multiplies) the columnValues for a given columnIndex as they are received.
+ * 
+ * EncRowCalcBolts send flush signals to the EncColMultBolts indicating 
that they have finished sending all tuples for a session. Whenever a flush 
signal is
+ * received from a EncRowCalcBolt, the num of received flush signals is 
tallied until each encrypted row has emitted a flush signal (there are 
2^hashBitSize
+ * rows).
+ * 
+ * Once a flush signal has been received from each row, all {@code 
} tuples are sent to the OutputBolt and 
a session_end
+ * signal is sent back to each EncRowMultBolt.
+ * 
+ * The EncRowMultBolts buffer their output from the time that they send a 
flush signal to the EncColMultBolts until the time that they receive a 
session_end
+ * signal from the EncColMultBolts.
+ * 
+ */
+public class EncColMultBolt extends BaseRichBolt
+{
+  private static final long serialVersionUID = 1L;
+
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(EncColMultBolt.class);
+
+  private OutputCollector outputCollector;
+
+  private BigInteger nSquared;
+  private long numFlushSignals;
+  private Long totalFlushSignals;
+
+  // This is the main object here. It holds column Id -> aggregated product
+  private HashMap resultsMap = new 
HashMap();
--- End diff --

use Map on LHS


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15441381#comment-15441381
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r76515132
  
--- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Bolt class to perform the encrypted row calculation
+ * 
+ * Receives a {@code } tuple as input.
+ * 
+ * Encrypts the row data and emits a (column index, encrypted row-value) 
tuple for each encrypted block.
+ * 
+ * Every FLUSH_FREQUENCY seconds, it sends a signal to EncColMultBolt to 
flush its output and resets all counters. At that point, all outgoing (column 
index,
+ * encrypted row-value) tuples are buffered until a SESSION_END signal is 
received back from the EncColMultBolt.
+ */
+public class EncRowCalcBolt extends BaseRichBolt
+{
+  private static final long serialVersionUID = 1L;
+
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(EncRowCalcBolt.class);
+
+  private OutputCollector outputCollector;
+  private static Query query;
+  private static boolean querySet = false;
+
+  private Boolean limitHitsPerSelector;
+  private Long maxHitsPerSelector;
+  private Long totalEndSigs;
+  private int rowDivisions;
+  private Boolean saltColumns;
+  private Boolean splitPartitions;
+
+  private Random rand;
+
+  // These are the main data structures used here.
+  private Map hitsByRow = new HashMap();
+  private Map colIndexByRow = new 
HashMap();
+  private List> matrixElements = new 
ArrayList>();
+  private List dataArray = new ArrayList<>();
+
+  private int numEndSigs = 0;
+
+  // These buffered values are used in the case when a session has been 
ejected, but the SESSION_END signal has not been received
+  // yet from the next bolt.
+  private boolean buffering = false;
+  private List> bufferedValues = new 
ArrayList>();
--- End diff --

no need to specify type on RHS


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15441378#comment-15441378
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r76515120
  
--- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
 ---
@@ -321,4 +316,81 @@ public static void loadCacheFromHDFS(FileSystem fs, 
String hdfsFileName, Query q
 
 return returnPairs;
   }
+
+  /**
+   * Method to compute the encrypted row elements for a query from 
extracted data partitions in the form of ArrayList<>
+   * 
+   * For each row (as indicated by key = hash(selector)), iterates over 
the dataPartitions and calculates the column values.
+   * 
+   * Uses a static LRU cache for the modular exponentiation
+   * 
+   * Caller is responsible for keeping track of the colIndex and the the 
maxHitsPerSelector values
+   * 
+   * Emits {@code Tuple2<>}
+   */
+  public static List> 
computeEncRow(List dataPartitions, Query query, int rowIndex, int 
colIndex)
+  throws IOException
+  {
+List> returnPairs = new 
ArrayList>();
+
+// Pull the corresponding encrypted row query
+BigInteger rowQuery = query.getQueryElement(rowIndex);
+
+// Initialize the column counter
+long colCounter = colIndex;
+
+logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = 
{}", dataPartitions, rowIndex, colCounter);
+
+// Update the associated column values
+for (int i = 0; i < dataPartitions.size(); ++i)
+{
+  BigInteger part = dataPartitions.get(i);
+
+  BigInteger exp = null;
+  try
+  {
+exp = expCache.get(new 
Tuple3(rowQuery, part, query.getNSquared()));
+  } catch (ExecutionException e)
+  {
+e.printStackTrace();
+break;
+  }
+
+  logger.debug("rowIndex = {} colCounter = {} part = {} part binary = 
{} exp = {} i = {} partition = {} = {}",
+  rowIndex, colCounter, part.toString(), part.toString(2), exp, i, 
dataPartitions.get(i), dataPartitions.get(i).toString(2));
+
+  returnPairs.add(new Tuple2(colCounter, exp));
+
+  ++colCounter;
+}
+
+return returnPairs;
+  }
+
+  public static List> computeEncRow(BigInteger 
part, Query query, int rowIndex, int colIndex) throws IOException
+  {
+List> returnPairs = new 
ArrayList>();
--- End diff --

no need of type on RHS


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15441374#comment-15441374
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r76515109
  
--- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
 ---
@@ -321,4 +316,81 @@ public static void loadCacheFromHDFS(FileSystem fs, 
String hdfsFileName, Query q
 
 return returnPairs;
   }
+
+  /**
+   * Method to compute the encrypted row elements for a query from 
extracted data partitions in the form of ArrayList<>
+   * 
+   * For each row (as indicated by key = hash(selector)), iterates over 
the dataPartitions and calculates the column values.
+   * 
+   * Uses a static LRU cache for the modular exponentiation
+   * 
+   * Caller is responsible for keeping track of the colIndex and the the 
maxHitsPerSelector values
+   * 
+   * Emits {@code Tuple2<>}
+   */
+  public static List> 
computeEncRow(List dataPartitions, Query query, int rowIndex, int 
colIndex)
+  throws IOException
+  {
+List> returnPairs = new 
ArrayList>();
--- End diff --

No need of specifying the types on RHS - can just be new ArrayList<>()


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15441369#comment-15441369
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r76515097
  
--- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java ---
@@ -90,7 +127,7 @@ static boolean validateResponderProperties()
 }
 
 String platform = 
SystemConfiguration.getProperty(PLATFORM).toLowerCase();
-if (!platform.equals("mapreduce") && !platform.equals("spark") && 
!platform.equals("standalone"))
+if (!platform.equals("mapreduce") && !platform.equals("spark") && 
!platform.equals("storm") && !platform.equals("standalone"))
--- End diff --

Change this to a switch stmt using ENUM - the if list is only gonna get 
longer with addition of other backends and streaming and batch versions for 
each of them.


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15440322#comment-15440322
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user clharris commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r76506112
  
--- Diff: src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java ---
@@ -96,6 +98,33 @@ public QueryInfo(UUID identifierInput, int 
numSelectorsInput, int hashBitSizeInp
 printQueryInfo();
   }
 
+  public QueryInfo(Map queryInfoMap)
--- End diff --

This sounds like a good idea.  If I don't do it in this commit, it will 
likely go in later.


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15440315#comment-15440315
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user clharris commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r76505942
  
--- Diff: src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java ---
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
--- End diff --

Not sure how that happened.  I'll clean it up. (As well as the other 
comments you mentioned).


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15439980#comment-15439980
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user tellison commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r76494284
  
--- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java ---
@@ -0,0 +1,130 @@

+/***
--- End diff --

Should follow comment convention used elsewhere ```/*``` over ```/*```


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15439972#comment-15439972
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user tellison commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r76494100
  
--- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
 ---
@@ -321,4 +316,80 @@ public static void loadCacheFromHDFS(FileSystem fs, 
String hdfsFileName, Query q
 
 return returnPairs;
   }
+
+  /**
+   * Method to compute the encrypted row elements for a query from 
extracted data partitions in the form of ArrayList<>
+   * 
+   * For each row (as indicated by key = hash(selector)), iterates over 
the dataPartitions and calculates the column values.
+   * 
+   * Uses a static LRU cache for the modular exponentiation
+   * 
+   * Caller is responsible for keeping track of the colIndex and the the 
maxHitsPerSelector values
+   * 
+   * Emits {@code Tuple2<>}
+   */
+  public static List> 
computeEncRow(List dataPartitions, Query query, int rowIndex, int 
colIndex)
+  throws IOException
+  {
+List> returnPairs = new 
ArrayList>();
+
+// Pull the corresponding encrypted row query
+BigInteger rowQuery = query.getQueryElement(rowIndex);
+
+// Initialize the column counter
+long colCounter = colIndex;
+
+logger.debug("dataPartitions.size() = " + dataPartitions + " rowIndex 
= " + rowIndex + " colCounter = " + colCounter);
+
+// Update the associated column values
+for (int i = 0; i < dataPartitions.size(); ++i)
+{
+  BigInteger part = dataPartitions.get(i);
+
+  BigInteger exp = null;
+  try
+  {
+exp = expCache.get(new 
Tuple3(rowQuery, part, query.getNSquared()));
+  } catch (ExecutionException e)
+  {
+e.printStackTrace();
--- End diff --

Really continue on exception?


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15439968#comment-15439968
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user tellison commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r76493980
  
--- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
 ---
@@ -94,11 +93,10 @@ public static void loadCacheFromHDFS(FileSystem fs, 
String hdfsFileName, Query q
* Method to compute the encrypted row elements for a query from 
extracted data partitions in the form of Iterable{@link }
* 
* For each row (as indicated by key = hash(selector)), iterates over 
the dataPartitions and calculates the column values.
-   * 
+   * 
--- End diff --

Javadoc spec paragraphs as ``, not ``.


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15439960#comment-15439960
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user tellison commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r76493783
  
--- Diff: src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java ---
@@ -96,6 +98,33 @@ public QueryInfo(UUID identifierInput, int 
numSelectorsInput, int hashBitSizeInp
 printQueryInfo();
   }
 
+  public QueryInfo(Map queryInfoMap)
--- End diff --

Consider moving the serialization logic out of the QueryInfo class.


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437097#comment-15437097
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user clharris commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r76268780
  
--- Diff: pom.xml ---
@@ -112,12 +112,14 @@
benchmarks
1.8
1.7.21
-   2.6.2
+   2.1
--- End diff --

Maybe I'm confused, but isn't that what I'm doing?...  If you look at where 
I imported storm-core in the pom, I have a bunch of expulsions for logging...


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437083#comment-15437083
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r7627
  
--- Diff: pom.xml ---
@@ -112,12 +112,14 @@
benchmarks
1.8
1.7.21
-   2.6.2
+   2.1
--- End diff --

Or u could try excluding the logging from Storm when adding Storm as a 
dependency. for eg: u can see how Storm is being excluded wit ElasticSearch 
dependency in present pom.


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437078#comment-15437078
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user clharris commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r76265886
  
--- Diff: pom.xml ---
@@ -112,12 +112,14 @@
benchmarks
1.8
1.7.21
-   2.6.2
+   2.1
--- End diff --

It seems that I can't shade out the logging from Storm.  I can shade the 
project's logger.  Or we can keep v 2.1 for now until we do modules and Storm 
has its own module?..  Maybe it's good to shade the project logger though to 
remove all potential problems?.. I'm very new to shading though. So I'm 
entirely open to whatever suggestions.


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428879#comment-15428879
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r75554770
  
--- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java ---
@@ -0,0 +1,237 @@

+/***
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ 
***/
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.*;
--- End diff --

Avoid import *


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428859#comment-15428859
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r75553735
  
--- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java ---
@@ -31,10 +31,7 @@
 
 import java.io.IOException;
 import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
+import java.util.*;
--- End diff --

We want to avoid import *, best to specify each import explicitly.


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428718#comment-15428718
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user clharris commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r75541067
  
--- Diff: 
src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java ---
@@ -0,0 +1,327 @@

+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ 
***/
+package org.apache.pirk.storm;
+
+import kafka.admin.AdminUtils;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pirk.encryption.Paillier;
+import org.apache.pirk.querier.wideskies.Querier;
+import org.apache.pirk.querier.wideskies.QuerierConst;
+import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse;
+import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.responder.wideskies.storm.*;
+import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.schema.query.filter.StopListFilter;
+import org.apache.pirk.schema.response.QueryResponseJSON;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.pirk.test.utils.BaseTests;
+import org.apache.pirk.test.utils.Inputs;
+import org.apache.pirk.test.utils.TestUtils;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.apache.storm.Config;
+import org.apache.storm.ILocalCluster;
+import org.apache.storm.Testing;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.SpoutConfig;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.spout.SchemeAsMultiScheme;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MkClusterParam;
+import org.apache.storm.testing.TestJob;
+import org.json.simple.JSONObject;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.math.BigInteger;
+import java.util.*;
+
+@Category(IntegrationTest.class)
+public class KafkaStormIntegrationTest
+{
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(KafkaStormIntegrationTest.class);
+
+  private static final LocalFileSystemStore localStore = new 
LocalFileSystemStore();
+
+  private static TestingServer zookeeperLocalCluster;
+  private static KafkaServer kafkaLocalBroker;
+  private static ZkClient zkClient;
+
+  private static final String topic = "pirk_test_topic";
+  private static final String kafkaTmpDir = "/tmp/kafka";
+
+  private static File fileQuery;
+  private static File fileQuerier;
+  private static String localStopListFile;
+
+  private QueryInfo queryInfo;
+  private BigInteger nSquared;
+
+  @Test
+  public void testKafkaStormIntegration() throws Exception
+  {
+SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true");
+SystemConfiguration.getProperty("pir.maxHitsPerSelector", "10");
+SystemConfiguration.setProperty("storm.spout.parallelism", "1");
+SystemConfiguration.setProperty("storm.hashbolt.parallelism", "1");
+

[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428715#comment-15428715
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on the issue:

https://github.com/apache/incubator-pirk/pull/74
  
Travis build fails on an Integration Test, here's the stack trace

`org.apache.pirk.storm.KafkaStormIntegrationTest
testKafkaStormIntegration(org.apache.pirk.storm.KafkaStormIntegrationTest)  
Time elapsed: 23.867 sec  <<< ERROR!
java.io.EOFException: null
at 
java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2353)
at 
java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2822)
at 
java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804)
at java.io.ObjectInputStream.(ObjectInputStream.java:301)
at 
org.apache.pirk.serialization.JavaSerializer.read(JavaSerializer.java:60)
at 
org.apache.pirk.serialization.LocalFileSystemStore.recall(LocalFileSystemStore.java:108)
at 
org.apache.pirk.serialization.LocalFileSystemStore.recall(LocalFileSystemStore.java:90)
at 
org.apache.pirk.storm.KafkaStormIntegrationTest.performDecryption(KafkaStormIntegrationTest.java:317)
at 
org.apache.pirk.storm.KafkaStormIntegrationTest.runTest(KafkaStormIntegrationTest.java:153)
at 
org.apache.pirk.storm.KafkaStormIntegrationTest.testKafkaStormIntegration(KafkaStormIntegrationTest.java:132)
Running org.apache.pirk.wideskies.standalone.StandaloneTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 11.615 sec 
- in org.apache.pirk.wideskies.standalone.StandaloneTest
Running org.apache.pirk.schema.query.LoadQuerySchemaTest
Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.058 sec - 
in org.apache.pirk.schema.query.LoadQuerySchemaTest
Running org.apache.pirk.schema.data.LoadDataSchemaTest
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.015 sec - 
in org.apache.pirk.schema.data.LoadDataSchemaTest
Results :
Tests in error: 
  
KafkaStormIntegrationTest.testKafkaStormIntegration:132->runTest:153->performDecryption:317
 » EOF`


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428711#comment-15428711
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user clharris commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r75540165
  
--- Diff: src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java ---
@@ -96,6 +98,42 @@ public QueryInfo(UUID identifierInput, int 
numSelectorsInput, int hashBitSizeInp
 printQueryInfo();
   }
 
+  public QueryInfo(Map queryInfoMap)
+  {
+// Seemed that the Storm Config would serialize the map as a json and 
read back in with numeric values as longs.
+// So had to cast as a long and call .intValue. However, this didn't 
work in the PirkHashScheme and had to try
+// the normal way of doing it as well.
+try
+{
+  identifier = UUID.fromString((String) queryInfoMap.get("uuid"));
+  queryType = (String) queryInfoMap.get("queryType");
+  numSelectors = ((Long) queryInfoMap.get("numSelectors")).intValue();
+  hashBitSize = ((Long) queryInfoMap.get("hashBitSize")).intValue();
+  hashKey = (String) queryInfoMap.get("hashKey");
+  numBitsPerDataElement = ((Long) 
queryInfoMap.get("numBitsPerDataElement")).intValue();
+  numPartitionsPerDataElement = ((Long) 
queryInfoMap.get("numPartitionsPerDataElement")).intValue();
+  dataPartitionBitSize = ((Long) 
queryInfoMap.get("dataPartitionsBitSize")).intValue();
+  useExpLookupTable = (boolean) queryInfoMap.get("useExpLookupTable");
+  useHDFSExpLookupTable = (boolean) 
queryInfoMap.get("useHDFSExpLookupTable");
+  embedSelector = (boolean) queryInfoMap.get("embedSelector");
+} catch (ClassCastException e)
+{
+  identifier = UUID.fromString((String) queryInfoMap.get("uuid"));
+  queryType = (String) queryInfoMap.get("queryType");
+  numSelectors = (int) queryInfoMap.get("numSelectors");
+  hashBitSize = (int) queryInfoMap.get("hashBitSize");
+  hashKey = (String) queryInfoMap.get("hashKey");
+  numBitsPerDataElement = (int) 
queryInfoMap.get("numBitsPerDataElement");
+  numPartitionsPerDataElement = (int) 
queryInfoMap.get("numPartitionsPerDataElement");
+  dataPartitionBitSize = (int) 
queryInfoMap.get("dataPartitionsBitSize");
+  useExpLookupTable = (boolean) queryInfoMap.get("useExpLookupTable");
+  useHDFSExpLookupTable = (boolean) 
queryInfoMap.get("useHDFSExpLookupTable");
+  embedSelector = (boolean) queryInfoMap.get("embedSelector");
+
+}
--- End diff --

You're right, the parts that are the same should definitely come out.  I'll 
clean this up.


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428705#comment-15428705
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user clharris commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r75539766
  
--- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
 ---
@@ -319,4 +314,80 @@ public static void loadCacheFromHDFS(FileSystem fs, 
String hdfsFileName, Query q
 
 return returnPairs;
   }
+
+  /**
+   * Method to compute the encrypted row elements for a query from 
extracted data partitions in the form of ArrayList<>
+   * 
+   * For each row (as indicated by key = hash(selector)), iterates over 
the dataPartitions and calculates the column values.
+   * 
+   * Uses a static LRU cache for the modular exponentiation
+   * 
+   * Caller is responsible for keeping track of the colIndex and the the 
maxHitsPerSelector values
+   * 
+   * Emits {@code Tuple2<>}
+   */
+  public static ArrayList> 
computeEncRow(ArrayList dataPartitions, Query query, int rowIndex, 
int colIndex)
+  throws IOException
+  {
+ArrayList> returnPairs = new 
ArrayList>();
--- End diff --

Will do.


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428689#comment-15428689
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r75538064
  
--- Diff: 
src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java ---
@@ -0,0 +1,327 @@

+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ 
***/
+package org.apache.pirk.storm;
+
+import kafka.admin.AdminUtils;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pirk.encryption.Paillier;
+import org.apache.pirk.querier.wideskies.Querier;
+import org.apache.pirk.querier.wideskies.QuerierConst;
+import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse;
+import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.responder.wideskies.storm.*;
+import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.schema.query.filter.StopListFilter;
+import org.apache.pirk.schema.response.QueryResponseJSON;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.pirk.test.utils.BaseTests;
+import org.apache.pirk.test.utils.Inputs;
+import org.apache.pirk.test.utils.TestUtils;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.apache.storm.Config;
+import org.apache.storm.ILocalCluster;
+import org.apache.storm.Testing;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.SpoutConfig;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.spout.SchemeAsMultiScheme;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MkClusterParam;
+import org.apache.storm.testing.TestJob;
+import org.json.simple.JSONObject;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.math.BigInteger;
+import java.util.*;
+
+@Category(IntegrationTest.class)
+public class KafkaStormIntegrationTest
+{
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(KafkaStormIntegrationTest.class);
+
+  private static final LocalFileSystemStore localStore = new 
LocalFileSystemStore();
+
+  private static TestingServer zookeeperLocalCluster;
+  private static KafkaServer kafkaLocalBroker;
+  private static ZkClient zkClient;
+
+  private static final String topic = "pirk_test_topic";
+  private static final String kafkaTmpDir = "/tmp/kafka";
+
+  private static File fileQuery;
+  private static File fileQuerier;
+  private static String localStopListFile;
+
+  private QueryInfo queryInfo;
+  private BigInteger nSquared;
+
+  @Test
+  public void testKafkaStormIntegration() throws Exception
+  {
+SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true");
+SystemConfiguration.getProperty("pir.maxHitsPerSelector", "10");
+SystemConfiguration.setProperty("storm.spout.parallelism", "1");
+SystemConfiguration.setProperty("storm.hashbolt.parallelism", "1");
+

[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428679#comment-15428679
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r75537551
  
--- Diff: src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java ---
@@ -96,6 +98,42 @@ public QueryInfo(UUID identifierInput, int 
numSelectorsInput, int hashBitSizeInp
 printQueryInfo();
   }
 
+  public QueryInfo(Map queryInfoMap)
+  {
+// Seemed that the Storm Config would serialize the map as a json and 
read back in with numeric values as longs.
+// So had to cast as a long and call .intValue. However, this didn't 
work in the PirkHashScheme and had to try
+// the normal way of doing it as well.
+try
+{
+  identifier = UUID.fromString((String) queryInfoMap.get("uuid"));
+  queryType = (String) queryInfoMap.get("queryType");
+  numSelectors = ((Long) queryInfoMap.get("numSelectors")).intValue();
+  hashBitSize = ((Long) queryInfoMap.get("hashBitSize")).intValue();
+  hashKey = (String) queryInfoMap.get("hashKey");
+  numBitsPerDataElement = ((Long) 
queryInfoMap.get("numBitsPerDataElement")).intValue();
+  numPartitionsPerDataElement = ((Long) 
queryInfoMap.get("numPartitionsPerDataElement")).intValue();
+  dataPartitionBitSize = ((Long) 
queryInfoMap.get("dataPartitionsBitSize")).intValue();
+  useExpLookupTable = (boolean) queryInfoMap.get("useExpLookupTable");
+  useHDFSExpLookupTable = (boolean) 
queryInfoMap.get("useHDFSExpLookupTable");
+  embedSelector = (boolean) queryInfoMap.get("embedSelector");
+} catch (ClassCastException e)
+{
+  identifier = UUID.fromString((String) queryInfoMap.get("uuid"));
+  queryType = (String) queryInfoMap.get("queryType");
+  numSelectors = (int) queryInfoMap.get("numSelectors");
+  hashBitSize = (int) queryInfoMap.get("hashBitSize");
+  hashKey = (String) queryInfoMap.get("hashKey");
+  numBitsPerDataElement = (int) 
queryInfoMap.get("numBitsPerDataElement");
+  numPartitionsPerDataElement = (int) 
queryInfoMap.get("numPartitionsPerDataElement");
+  dataPartitionBitSize = (int) 
queryInfoMap.get("dataPartitionsBitSize");
+  useExpLookupTable = (boolean) queryInfoMap.get("useExpLookupTable");
+  useHDFSExpLookupTable = (boolean) 
queryInfoMap.get("useHDFSExpLookupTable");
+  embedSelector = (boolean) queryInfoMap.get("embedSelector");
+
+}
--- End diff --

?? ?Why r we doing the same thing in try and catch clauses ?


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428680#comment-15428680
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r75537583
  
--- Diff: src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java ---
@@ -151,6 +189,24 @@ public boolean getEmbedSelector()
 return embedSelector;
   }
 
+  public Map toMap()
+  {
+HashMap queryInfo = new HashMap();
--- End diff --

Use Map on LHS


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428674#comment-15428674
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r75537022
  
--- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java ---
@@ -0,0 +1,198 @@

+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ 
***/
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Bolt to compute and output the final Response object for a query
+ * 
+ * Receives {@code } tuples, computes the final 
column product for each colIndex, records the results in the final Response 
object, and
+ * outputs the final Response object for the query.
+ * 
+ * Flush signals are sent to the OuputBolt from the EncColMultBolts via a 
tuple of the form {@code <-1, 0>}. Once a flush signal has been received from 
each
+ * EncColMultBolt (or a timeout is reached), the final column product is 
computed and the final Response is formed and emitted.
+ * 
+ * Currently, the Responses are written to HDFS to location specified by 
the outputFile with the timestamp appended.
+ * 
+ * TODO: -- Enable other Response output locations
+ * 
+ */
+public class OutputBolt extends BaseRichBolt
+{
+  private static final long serialVersionUID = 1L;
+
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(OutputBolt.class);
+
+  private OutputCollector outputCollector;
+  private QueryInfo queryInfo;
+  private Response response;
+  private String outputFile;
+  private boolean hdfs;
+  private String hdfsUri;
+  private Integer flushCounter = 0;
+  private ArrayList tuplesToAck = new ArrayList();
+  private Integer totalFlushSigs;
+
+  private LocalFileSystemStore localStore;
+  private HadoopFileSystemStore hadoopStore;
+
+  // This latch just serves as a hook for testing.
+  public static CountDownLatch latch = new CountDownLatch(1);
+
+  // This is the main object here. It holds column Id -> product
+  private HashMap resultsMap = new 
HashMap();
--- End diff --

Use a Map on LHS


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428663#comment-15428663
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r75536159
  
--- Diff: pom.xml ---
@@ -248,6 +270,64 @@


 
+
+   
+   org.apache.storm
+   storm-core
+   ${storm.version}
+   provided
+   
+   
+   
org.apache.logging.log4j
+   log4j-api
+   
+   
+   
org.apache.logging.log4j
+   log4j-core
+   
+   
+   
org.apache.logging.log4j
+   
log4j-slf4j-impl
+   
+   
+   org.slf4j
+   slf4j-log4j12
+   
+   
+   
+
+   
+   org.apache.storm
+   storm-kafka
+   ${storm.version}
+   
+
+   
+   org.apache.kafka
+   kafka_2.10
+   0.9.0.1
--- End diff --

make this a property ${kafka.version}


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428654#comment-15428654
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r75535198
  
--- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
 ---
@@ -319,4 +314,80 @@ public static void loadCacheFromHDFS(FileSystem fs, 
String hdfsFileName, Query q
 
 return returnPairs;
   }
+
+  /**
+   * Method to compute the encrypted row elements for a query from 
extracted data partitions in the form of ArrayList<>
+   * 
+   * For each row (as indicated by key = hash(selector)), iterates over 
the dataPartitions and calculates the column values.
+   * 
+   * Uses a static LRU cache for the modular exponentiation
+   * 
+   * Caller is responsible for keeping track of the colIndex and the the 
maxHitsPerSelector values
+   * 
+   * Emits {@code Tuple2<>}
+   */
+  public static ArrayList> 
computeEncRow(ArrayList dataPartitions, Query query, int rowIndex, 
int colIndex)
+  throws IOException
+  {
+ArrayList> returnPairs = new 
ArrayList>();
+
+// Pull the corresponding encrypted row query
+BigInteger rowQuery = query.getQueryElement(rowIndex);
+
+// Initialize the column counter
+long colCounter = colIndex;
+
+logger.debug("dataPartitions.size() = " + dataPartitions + " rowIndex 
= " + rowIndex + " colCounter = " + colCounter);
+
+// Update the associated column values
+for (int i = 0; i < dataPartitions.size(); ++i)
+{
+  BigInteger part = dataPartitions.get(i);
+
+  BigInteger exp = null;
+  try
+  {
+exp = expCache.get(new 
Tuple3(rowQuery, part, query.getNSquared()));
+  } catch (ExecutionException e)
+  {
+e.printStackTrace();
+  }
+
+  logger.debug("rowIndex = " + rowIndex + " colCounter = " + 
colCounter + " part = " + part.toString() + " part binary = " + 
part.toString(2) + " exp = "
+  + exp + " i = " + i + " partition = " + dataPartitions.get(i) + 
" = " + dataPartitions.get(i).toString(2));
+
+  returnPairs.add(new Tuple2(colCounter, exp));
+
+  ++colCounter;
+}
+
+return returnPairs;
+  }
+
+  public static ArrayList> 
computeEncRow(BigInteger part, Query query, int rowIndex, int colIndex) throws 
IOException
+  {
+ArrayList> returnPairs = new 
ArrayList>();
--- End diff --

Use a List on LHS and as return type of method


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428646#comment-15428646
 ] 

ASF GitHub Bot commented on PIRK-4:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/incubator-pirk/pull/74#discussion_r75534791
  
--- Diff: 
src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java ---
@@ -65,6 +66,13 @@ else if 
(SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("spark"
   ComputeResponse computeResponse = new ComputeResponse(fs);
   computeResponse.performQuery();
 }
+else if 
(SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("storm"))
--- End diff --

we will need to abstract this out to implement this as a Strategy Pattern - 
the more backends we add like Flink, Spark, Beam and others in the future its 
only gonna get messy with multi - if-else stmts. 

For a fun context, let me point u to this code snippet from IBM SystemML, 
they r now adding Flink support which would be another another if-else block of 
code -  https://gist.github.com/smarthi/eb848e46621b7444924f


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428638#comment-15428638
 ] 

ASF GitHub Bot commented on PIRK-4:
---

GitHub user clharris opened a pull request:

https://github.com/apache/incubator-pirk/pull/74

PIRK-4 [WIP] Add Streaming Implementation for Apache Storm

This is an initial implementation for a streaming version of Pirk to run on 
Apache Storm.  I am leaving it temporarily as WIP so people have a chance to 
look over it and add feedback.  Right now there is only one integration test 
which runs the Storm topology 4 times with each of the different significant 
configuration possibilities. I wanted to unit test the bolts, but it seems not 
very straightforward with the way that the Pirk processing works.  I'll try to 
add some documentation (at the minimum a diagram of the Pirk Storm topology) by 
early next week.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/clharris/incubator-pirk pirk-4

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-pirk/pull/74.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #74


commit 577fb33d04ce9061d2e88a27595737f30b9c446d
Author: charris 
Date:   2016-07-19T14:33:34Z

Temporarily using Cloudera pom.

commit 41f9042b81a5b683ecbd2c718f98712283acacf9
Author: charris 
Date:   2016-07-19T14:37:36Z

PIRK-4 Initial commit.

commit 0c147c36b377c730c7d6489a82b71958e59cdb2d
Author: charris 
Date:   2016-07-19T14:38:27Z

Adding toMap method and constructor.

commit a30f1a9de6e02dbf55bdc4e771d673b429ac331f
Author: charris 
Date:   2016-07-19T14:39:14Z

Adding functionality for streaming processing.

commit 8d27e5d3f2ff1fec9f2fffd29052189f567d51f0
Author: charris 
Date:   2016-07-19T14:40:17Z

Broke out method to checkDNSHostNameQueryResults.

commit 4df0e31836ec26da00662d92c5eac081c61419b2
Author: charris 
Date:   2016-07-19T15:04:07Z

Added Storm parameters.

commit 126361c31426d8285b4202a1134a653dd210dd4b
Author: eawilliams 
Date:   2016-07-22T14:55:28Z

initial modifications to storm code

commit 9f78a94dec84c7fcb5c09f13ae62598a7700adf4
Author: eawilliams 
Date:   2016-07-22T14:55:57Z

initial modifications to storm code - removing logs

commit 4bdd0c1f2461fdf0a1254bd6369db70b05b3e12a
Author: eawilliams 
Date:   2016-07-22T15:52:06Z

splitPartitions update to storm implementation

commit 180c56161ceffbe7c2d24719f8f9e5071ca632cb
Author: eawilliams 
Date:   2016-07-23T16:44:24Z

changed splitPartitions to false in test

commit 2520fbe8e0e6eb6a9ee8d259e0a5f10f965e916e
Author: charris 
Date:   2016-07-25T15:02:00Z

splitPartitions allows data element partitions to be sent individually.

commit e4e69e0119df4c0659a55bee7d6a5c55a016c9ff
Author: charris 
Date:   2016-07-30T19:41:11Z

Moved hashing into the spout.

commit 3262f0fdd52d62c050c6dc7fb6c6b41c34e99173
Author: charris 
Date:   2016-07-30T19:41:55Z

Added some functionality useful for Storm.

commit 290404531eaaa4442ef611252f4d24b2be7a6371
Author: charris 
Date:   2016-07-30T19:42:12Z

Fixed based on refactoring.

commit ba81c6814d0f0e01974fb934a0625c44676a316e
Author: charris 
Date:   2016-07-31T02:21:55Z

Added Apache headers.

commit 987d2f5254e359aa54ec5969935dda124b4d85a1
Author: charris 
Date:   2016-07-31T20:34:34Z

Changed Scala version of Spark and added curator-test.

commit 3b617c1b6c5ccdc560a2e32e58815b8732f18e58
Author: charris 
Date:   2016-07-31T23:17:34Z

Merging.

commit 22b946cbb361f96f4ecb67fb317d44be1c1b1498
Author: charris 
Date:   2016-07-31T23:18:24Z

Merge branch 'master' of https://github.com/apache/incubator-pirk into storm

commit 204495f8a75d38fd0b51a0970352e2204367e6ff
Author: eawilliams 
Date:   2016-08-01T00:34:32Z

fixed logging and scala version issues via pom modifications; refactored 
storm test package

commit 8f8113727be5ae7de97adf82d8953abb2efbde63
Author: eawilliams 
Date:   2016-08-01T01:57:04Z

Merging - closes clharris/incubator-pirk#1

commit 1f8b42fdbf2326114f0cbce5fdfb7bd9c53306c3
Author: Chris Harris 
Date:   2016-08-01T12:22:36Z

Merge pull request #1 from ellisonanne/chris-storm

fixed logging and scala version issues via pom modifications; refacto…

commit a61c9765bff4018041201d3adc7f81038f5499c6
Author: charris 

[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm

2016-07-22 Thread Suneel Marthi (JIRA)

[ 
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15390109#comment-15390109
 ] 

Suneel Marthi commented on PIRK-4:
--

I have an example Beam application that computes Okapi BM25 score for each word 
in a document of a corpus. See https://en.wikipedia.org/wiki/Okapi_BM25. 

This code can be executed on both Flink and Spark runners by just setting: 


 options.setRunner(FlinkRunner.class);


> Add Streaming Implementation for Apache Storm
> -
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
>  Issue Type: Task
>  Components: Responder
>Reporter: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)