[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15445489#comment-15445489 ] ASF GitHub Bot commented on PIRK-4: --- Github user clharris closed the pull request at: https://github.com/apache/incubator-pirk/pull/74 > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15441379#comment-15441379 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r76515127 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pirk.responder.wideskies.storm; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.HashMap; +import java.util.Map; + +/** + * Bolt class to perform encrypted column multiplication + * + * Takes {@code} tuples as input and aggregates (multiplies) the columnValues for a given columnIndex as they are received. + * + * EncRowCalcBolts send flush signals to the EncColMultBolts indicating that they have finished sending all tuples for a session. Whenever a flush signal is + * received from a EncRowCalcBolt, the num of received flush signals is tallied until each encrypted row has emitted a flush signal (there are 2^hashBitSize + * rows). + * + * Once a flush signal has been received from each row, all {@code } tuples are sent to the OutputBolt and a session_end + * signal is sent back to each EncRowMultBolt. + * + * The EncRowMultBolts buffer their output from the time that they send a flush signal to the EncColMultBolts until the time that they receive a session_end + * signal from the EncColMultBolts. + * + */ +public class EncColMultBolt extends BaseRichBolt +{ + private static final long serialVersionUID = 1L; + + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(EncColMultBolt.class); + + private OutputCollector outputCollector; + + private BigInteger nSquared; + private long numFlushSignals; + private Long totalFlushSignals; + + // This is the main object here. It holds column Id -> aggregated product + private HashMap resultsMap = new HashMap (); --- End diff -- use Map on LHS > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15441381#comment-15441381 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r76515132 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pirk.responder.wideskies.storm; + +import org.apache.pirk.query.wideskies.Query; +import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +/** + * Bolt class to perform the encrypted row calculation + * + * Receives a {@code} tuple as input. + * + * Encrypts the row data and emits a (column index, encrypted row-value) tuple for each encrypted block. + * + * Every FLUSH_FREQUENCY seconds, it sends a signal to EncColMultBolt to flush its output and resets all counters. At that point, all outgoing (column index, + * encrypted row-value) tuples are buffered until a SESSION_END signal is received back from the EncColMultBolt. + */ +public class EncRowCalcBolt extends BaseRichBolt +{ + private static final long serialVersionUID = 1L; + + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(EncRowCalcBolt.class); + + private OutputCollector outputCollector; + private static Query query; + private static boolean querySet = false; + + private Boolean limitHitsPerSelector; + private Long maxHitsPerSelector; + private Long totalEndSigs; + private int rowDivisions; + private Boolean saltColumns; + private Boolean splitPartitions; + + private Random rand; + + // These are the main data structures used here. + private Map hitsByRow = new HashMap (); + private Map colIndexByRow = new HashMap (); + private List > matrixElements = new ArrayList >(); + private List dataArray = new ArrayList<>(); + + private int numEndSigs = 0; + + // These buffered values are used in the case when a session has been ejected, but the SESSION_END signal has not been received + // yet from the next bolt. + private boolean buffering = false; + private List > bufferedValues = new ArrayList >(); --- End diff -- no need to specify type on RHS > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15441378#comment-15441378 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r76515120 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java --- @@ -321,4 +316,81 @@ public static void loadCacheFromHDFS(FileSystem fs, String hdfsFileName, Query q return returnPairs; } + + /** + * Method to compute the encrypted row elements for a query from extracted data partitions in the form of ArrayList<> + * + * For each row (as indicated by key = hash(selector)), iterates over the dataPartitions and calculates the column values. + * + * Uses a static LRU cache for the modular exponentiation + * + * Caller is responsible for keeping track of the colIndex and the the maxHitsPerSelector values + * + * Emits {@code Tuple2<>} + */ + public static List > computeEncRow(List dataPartitions, Query query, int rowIndex, int colIndex) + throws IOException + { +List > returnPairs = new ArrayList >(); + +// Pull the corresponding encrypted row query +BigInteger rowQuery = query.getQueryElement(rowIndex); + +// Initialize the column counter +long colCounter = colIndex; + +logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions, rowIndex, colCounter); + +// Update the associated column values +for (int i = 0; i < dataPartitions.size(); ++i) +{ + BigInteger part = dataPartitions.get(i); + + BigInteger exp = null; + try + { +exp = expCache.get(new Tuple3 (rowQuery, part, query.getNSquared())); + } catch (ExecutionException e) + { +e.printStackTrace(); +break; + } + + logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {} partition = {} = {}", + rowIndex, colCounter, part.toString(), part.toString(2), exp, i, dataPartitions.get(i), dataPartitions.get(i).toString(2)); + + returnPairs.add(new Tuple2 (colCounter, exp)); + + ++colCounter; +} + +return returnPairs; + } + + public static List > computeEncRow(BigInteger part, Query query, int rowIndex, int colIndex) throws IOException + { +List > returnPairs = new ArrayList >(); --- End diff -- no need of type on RHS > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15441374#comment-15441374 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r76515109 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java --- @@ -321,4 +316,81 @@ public static void loadCacheFromHDFS(FileSystem fs, String hdfsFileName, Query q return returnPairs; } + + /** + * Method to compute the encrypted row elements for a query from extracted data partitions in the form of ArrayList<> + * + * For each row (as indicated by key = hash(selector)), iterates over the dataPartitions and calculates the column values. + * + * Uses a static LRU cache for the modular exponentiation + * + * Caller is responsible for keeping track of the colIndex and the the maxHitsPerSelector values + * + * Emits {@code Tuple2<>} + */ + public static List > computeEncRow(List dataPartitions, Query query, int rowIndex, int colIndex) + throws IOException + { +List > returnPairs = new ArrayList >(); --- End diff -- No need of specifying the types on RHS - can just be new ArrayList<>() > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15441369#comment-15441369 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r76515097 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java --- @@ -90,7 +127,7 @@ static boolean validateResponderProperties() } String platform = SystemConfiguration.getProperty(PLATFORM).toLowerCase(); -if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("standalone")) +if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("storm") && !platform.equals("standalone")) --- End diff -- Change this to a switch stmt using ENUM - the if list is only gonna get longer with addition of other backends and streaming and batch versions for each of them. > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15440322#comment-15440322 ] ASF GitHub Bot commented on PIRK-4: --- Github user clharris commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r76506112 --- Diff: src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java --- @@ -96,6 +98,33 @@ public QueryInfo(UUID identifierInput, int numSelectorsInput, int hashBitSizeInp printQueryInfo(); } + public QueryInfo(Map queryInfoMap) --- End diff -- This sounds like a good idea. If I don't do it in this commit, it will likely go in later. > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15440315#comment-15440315 ] ASF GitHub Bot commented on PIRK-4: --- Github user clharris commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r76505942 --- Diff: src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java --- @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * --- End diff -- Not sure how that happened. I'll clean it up. (As well as the other comments you mentioned). > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15439980#comment-15439980 ] ASF GitHub Bot commented on PIRK-4: --- Github user tellison commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r76494284 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java --- @@ -0,0 +1,130 @@ +/*** --- End diff -- Should follow comment convention used elsewhere ```/*``` over ```/*``` > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15439972#comment-15439972 ] ASF GitHub Bot commented on PIRK-4: --- Github user tellison commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r76494100 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java --- @@ -321,4 +316,80 @@ public static void loadCacheFromHDFS(FileSystem fs, String hdfsFileName, Query q return returnPairs; } + + /** + * Method to compute the encrypted row elements for a query from extracted data partitions in the form of ArrayList<> + * + * For each row (as indicated by key = hash(selector)), iterates over the dataPartitions and calculates the column values. + * + * Uses a static LRU cache for the modular exponentiation + * + * Caller is responsible for keeping track of the colIndex and the the maxHitsPerSelector values + * + * Emits {@code Tuple2<>} + */ + public static List > computeEncRow(List dataPartitions, Query query, int rowIndex, int colIndex) + throws IOException + { +List > returnPairs = new ArrayList >(); + +// Pull the corresponding encrypted row query +BigInteger rowQuery = query.getQueryElement(rowIndex); + +// Initialize the column counter +long colCounter = colIndex; + +logger.debug("dataPartitions.size() = " + dataPartitions + " rowIndex = " + rowIndex + " colCounter = " + colCounter); + +// Update the associated column values +for (int i = 0; i < dataPartitions.size(); ++i) +{ + BigInteger part = dataPartitions.get(i); + + BigInteger exp = null; + try + { +exp = expCache.get(new Tuple3 (rowQuery, part, query.getNSquared())); + } catch (ExecutionException e) + { +e.printStackTrace(); --- End diff -- Really continue on exception? > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15439968#comment-15439968 ] ASF GitHub Bot commented on PIRK-4: --- Github user tellison commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r76493980 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java --- @@ -94,11 +93,10 @@ public static void loadCacheFromHDFS(FileSystem fs, String hdfsFileName, Query q * Method to compute the encrypted row elements for a query from extracted data partitions in the form of Iterable{@link } * * For each row (as indicated by key = hash(selector)), iterates over the dataPartitions and calculates the column values. - * + * --- End diff -- Javadoc spec paragraphs as ``, not ``. > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15439960#comment-15439960 ] ASF GitHub Bot commented on PIRK-4: --- Github user tellison commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r76493783 --- Diff: src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java --- @@ -96,6 +98,33 @@ public QueryInfo(UUID identifierInput, int numSelectorsInput, int hashBitSizeInp printQueryInfo(); } + public QueryInfo(Map queryInfoMap) --- End diff -- Consider moving the serialization logic out of the QueryInfo class. > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437097#comment-15437097 ] ASF GitHub Bot commented on PIRK-4: --- Github user clharris commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r76268780 --- Diff: pom.xml --- @@ -112,12 +112,14 @@ benchmarks 1.8 1.7.21 - 2.6.2 + 2.1 --- End diff -- Maybe I'm confused, but isn't that what I'm doing?... If you look at where I imported storm-core in the pom, I have a bunch of expulsions for logging... > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437083#comment-15437083 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r7627 --- Diff: pom.xml --- @@ -112,12 +112,14 @@ benchmarks 1.8 1.7.21 - 2.6.2 + 2.1 --- End diff -- Or u could try excluding the logging from Storm when adding Storm as a dependency. for eg: u can see how Storm is being excluded wit ElasticSearch dependency in present pom. > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437078#comment-15437078 ] ASF GitHub Bot commented on PIRK-4: --- Github user clharris commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r76265886 --- Diff: pom.xml --- @@ -112,12 +112,14 @@ benchmarks 1.8 1.7.21 - 2.6.2 + 2.1 --- End diff -- It seems that I can't shade out the logging from Storm. I can shade the project's logger. Or we can keep v 2.1 for now until we do modules and Storm has its own module?.. Maybe it's good to shade the project logger though to remove all potential problems?.. I'm very new to shading though. So I'm entirely open to whatever suggestions. > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428879#comment-15428879 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r75554770 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java --- @@ -0,0 +1,237 @@ +/*** + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + ***/ +package org.apache.pirk.responder.wideskies.storm; + +import org.apache.pirk.query.wideskies.Query; +import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.*; --- End diff -- Avoid import * > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428859#comment-15428859 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r75553735 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java --- @@ -31,10 +31,7 @@ import java.io.IOException; import java.math.BigInteger; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; +import java.util.*; --- End diff -- We want to avoid import *, best to specify each import explicitly. > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428718#comment-15428718 ] ASF GitHub Bot commented on PIRK-4: --- Github user clharris commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r75541067 --- Diff: src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java --- @@ -0,0 +1,327 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + ***/ +package org.apache.pirk.storm; + +import kafka.admin.AdminUtils; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +import org.I0Itec.zkclient.ZkClient; + +import org.I0Itec.zkclient.ZkConnection; +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.pirk.encryption.Paillier; +import org.apache.pirk.querier.wideskies.Querier; +import org.apache.pirk.querier.wideskies.QuerierConst; +import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse; +import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery; +import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.responder.wideskies.storm.*; +import org.apache.pirk.response.wideskies.Response; +import org.apache.pirk.schema.query.filter.StopListFilter; +import org.apache.pirk.schema.response.QueryResponseJSON; +import org.apache.pirk.serialization.LocalFileSystemStore; +import org.apache.pirk.test.utils.BaseTests; +import org.apache.pirk.test.utils.Inputs; +import org.apache.pirk.test.utils.TestUtils; +import org.apache.pirk.utils.SystemConfiguration; +import org.apache.storm.Config; +import org.apache.storm.ILocalCluster; +import org.apache.storm.Testing; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.SpoutConfig; +import org.apache.storm.kafka.ZkHosts; +import org.apache.storm.spout.SchemeAsMultiScheme; +import org.apache.storm.testing.IntegrationTest; +import org.apache.storm.testing.MkClusterParam; +import org.apache.storm.testing.TestJob; +import org.json.simple.JSONObject; + +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.math.BigInteger; +import java.util.*; + +@Category(IntegrationTest.class) +public class KafkaStormIntegrationTest +{ + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaStormIntegrationTest.class); + + private static final LocalFileSystemStore localStore = new LocalFileSystemStore(); + + private static TestingServer zookeeperLocalCluster; + private static KafkaServer kafkaLocalBroker; + private static ZkClient zkClient; + + private static final String topic = "pirk_test_topic"; + private static final String kafkaTmpDir = "/tmp/kafka"; + + private static File fileQuery; + private static File fileQuerier; + private static String localStopListFile; + + private QueryInfo queryInfo; + private BigInteger nSquared; + + @Test + public void testKafkaStormIntegration() throws Exception + { +SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true"); +SystemConfiguration.getProperty("pir.maxHitsPerSelector", "10"); +SystemConfiguration.setProperty("storm.spout.parallelism", "1"); +SystemConfiguration.setProperty("storm.hashbolt.parallelism", "1"); +
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428715#comment-15428715 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on the issue: https://github.com/apache/incubator-pirk/pull/74 Travis build fails on an Integration Test, here's the stack trace `org.apache.pirk.storm.KafkaStormIntegrationTest testKafkaStormIntegration(org.apache.pirk.storm.KafkaStormIntegrationTest) Time elapsed: 23.867 sec <<< ERROR! java.io.EOFException: null at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2353) at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2822) at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) at java.io.ObjectInputStream.(ObjectInputStream.java:301) at org.apache.pirk.serialization.JavaSerializer.read(JavaSerializer.java:60) at org.apache.pirk.serialization.LocalFileSystemStore.recall(LocalFileSystemStore.java:108) at org.apache.pirk.serialization.LocalFileSystemStore.recall(LocalFileSystemStore.java:90) at org.apache.pirk.storm.KafkaStormIntegrationTest.performDecryption(KafkaStormIntegrationTest.java:317) at org.apache.pirk.storm.KafkaStormIntegrationTest.runTest(KafkaStormIntegrationTest.java:153) at org.apache.pirk.storm.KafkaStormIntegrationTest.testKafkaStormIntegration(KafkaStormIntegrationTest.java:132) Running org.apache.pirk.wideskies.standalone.StandaloneTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 11.615 sec - in org.apache.pirk.wideskies.standalone.StandaloneTest Running org.apache.pirk.schema.query.LoadQuerySchemaTest Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.058 sec - in org.apache.pirk.schema.query.LoadQuerySchemaTest Running org.apache.pirk.schema.data.LoadDataSchemaTest Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.015 sec - in org.apache.pirk.schema.data.LoadDataSchemaTest Results : Tests in error: KafkaStormIntegrationTest.testKafkaStormIntegration:132->runTest:153->performDecryption:317 » EOF` > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428711#comment-15428711 ] ASF GitHub Bot commented on PIRK-4: --- Github user clharris commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r75540165 --- Diff: src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java --- @@ -96,6 +98,42 @@ public QueryInfo(UUID identifierInput, int numSelectorsInput, int hashBitSizeInp printQueryInfo(); } + public QueryInfo(Map queryInfoMap) + { +// Seemed that the Storm Config would serialize the map as a json and read back in with numeric values as longs. +// So had to cast as a long and call .intValue. However, this didn't work in the PirkHashScheme and had to try +// the normal way of doing it as well. +try +{ + identifier = UUID.fromString((String) queryInfoMap.get("uuid")); + queryType = (String) queryInfoMap.get("queryType"); + numSelectors = ((Long) queryInfoMap.get("numSelectors")).intValue(); + hashBitSize = ((Long) queryInfoMap.get("hashBitSize")).intValue(); + hashKey = (String) queryInfoMap.get("hashKey"); + numBitsPerDataElement = ((Long) queryInfoMap.get("numBitsPerDataElement")).intValue(); + numPartitionsPerDataElement = ((Long) queryInfoMap.get("numPartitionsPerDataElement")).intValue(); + dataPartitionBitSize = ((Long) queryInfoMap.get("dataPartitionsBitSize")).intValue(); + useExpLookupTable = (boolean) queryInfoMap.get("useExpLookupTable"); + useHDFSExpLookupTable = (boolean) queryInfoMap.get("useHDFSExpLookupTable"); + embedSelector = (boolean) queryInfoMap.get("embedSelector"); +} catch (ClassCastException e) +{ + identifier = UUID.fromString((String) queryInfoMap.get("uuid")); + queryType = (String) queryInfoMap.get("queryType"); + numSelectors = (int) queryInfoMap.get("numSelectors"); + hashBitSize = (int) queryInfoMap.get("hashBitSize"); + hashKey = (String) queryInfoMap.get("hashKey"); + numBitsPerDataElement = (int) queryInfoMap.get("numBitsPerDataElement"); + numPartitionsPerDataElement = (int) queryInfoMap.get("numPartitionsPerDataElement"); + dataPartitionBitSize = (int) queryInfoMap.get("dataPartitionsBitSize"); + useExpLookupTable = (boolean) queryInfoMap.get("useExpLookupTable"); + useHDFSExpLookupTable = (boolean) queryInfoMap.get("useHDFSExpLookupTable"); + embedSelector = (boolean) queryInfoMap.get("embedSelector"); + +} --- End diff -- You're right, the parts that are the same should definitely come out. I'll clean this up. > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428705#comment-15428705 ] ASF GitHub Bot commented on PIRK-4: --- Github user clharris commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r75539766 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java --- @@ -319,4 +314,80 @@ public static void loadCacheFromHDFS(FileSystem fs, String hdfsFileName, Query q return returnPairs; } + + /** + * Method to compute the encrypted row elements for a query from extracted data partitions in the form of ArrayList<> + * + * For each row (as indicated by key = hash(selector)), iterates over the dataPartitions and calculates the column values. + * + * Uses a static LRU cache for the modular exponentiation + * + * Caller is responsible for keeping track of the colIndex and the the maxHitsPerSelector values + * + * Emits {@code Tuple2<>} + */ + public static ArrayList > computeEncRow(ArrayList dataPartitions, Query query, int rowIndex, int colIndex) + throws IOException + { +ArrayList > returnPairs = new ArrayList >(); --- End diff -- Will do. > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428689#comment-15428689 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r75538064 --- Diff: src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java --- @@ -0,0 +1,327 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + ***/ +package org.apache.pirk.storm; + +import kafka.admin.AdminUtils; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +import org.I0Itec.zkclient.ZkClient; + +import org.I0Itec.zkclient.ZkConnection; +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.pirk.encryption.Paillier; +import org.apache.pirk.querier.wideskies.Querier; +import org.apache.pirk.querier.wideskies.QuerierConst; +import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse; +import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery; +import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.responder.wideskies.storm.*; +import org.apache.pirk.response.wideskies.Response; +import org.apache.pirk.schema.query.filter.StopListFilter; +import org.apache.pirk.schema.response.QueryResponseJSON; +import org.apache.pirk.serialization.LocalFileSystemStore; +import org.apache.pirk.test.utils.BaseTests; +import org.apache.pirk.test.utils.Inputs; +import org.apache.pirk.test.utils.TestUtils; +import org.apache.pirk.utils.SystemConfiguration; +import org.apache.storm.Config; +import org.apache.storm.ILocalCluster; +import org.apache.storm.Testing; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.SpoutConfig; +import org.apache.storm.kafka.ZkHosts; +import org.apache.storm.spout.SchemeAsMultiScheme; +import org.apache.storm.testing.IntegrationTest; +import org.apache.storm.testing.MkClusterParam; +import org.apache.storm.testing.TestJob; +import org.json.simple.JSONObject; + +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.math.BigInteger; +import java.util.*; + +@Category(IntegrationTest.class) +public class KafkaStormIntegrationTest +{ + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaStormIntegrationTest.class); + + private static final LocalFileSystemStore localStore = new LocalFileSystemStore(); + + private static TestingServer zookeeperLocalCluster; + private static KafkaServer kafkaLocalBroker; + private static ZkClient zkClient; + + private static final String topic = "pirk_test_topic"; + private static final String kafkaTmpDir = "/tmp/kafka"; + + private static File fileQuery; + private static File fileQuerier; + private static String localStopListFile; + + private QueryInfo queryInfo; + private BigInteger nSquared; + + @Test + public void testKafkaStormIntegration() throws Exception + { +SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true"); +SystemConfiguration.getProperty("pir.maxHitsPerSelector", "10"); +SystemConfiguration.setProperty("storm.spout.parallelism", "1"); +SystemConfiguration.setProperty("storm.hashbolt.parallelism", "1"); +
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428679#comment-15428679 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r75537551 --- Diff: src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java --- @@ -96,6 +98,42 @@ public QueryInfo(UUID identifierInput, int numSelectorsInput, int hashBitSizeInp printQueryInfo(); } + public QueryInfo(Map queryInfoMap) + { +// Seemed that the Storm Config would serialize the map as a json and read back in with numeric values as longs. +// So had to cast as a long and call .intValue. However, this didn't work in the PirkHashScheme and had to try +// the normal way of doing it as well. +try +{ + identifier = UUID.fromString((String) queryInfoMap.get("uuid")); + queryType = (String) queryInfoMap.get("queryType"); + numSelectors = ((Long) queryInfoMap.get("numSelectors")).intValue(); + hashBitSize = ((Long) queryInfoMap.get("hashBitSize")).intValue(); + hashKey = (String) queryInfoMap.get("hashKey"); + numBitsPerDataElement = ((Long) queryInfoMap.get("numBitsPerDataElement")).intValue(); + numPartitionsPerDataElement = ((Long) queryInfoMap.get("numPartitionsPerDataElement")).intValue(); + dataPartitionBitSize = ((Long) queryInfoMap.get("dataPartitionsBitSize")).intValue(); + useExpLookupTable = (boolean) queryInfoMap.get("useExpLookupTable"); + useHDFSExpLookupTable = (boolean) queryInfoMap.get("useHDFSExpLookupTable"); + embedSelector = (boolean) queryInfoMap.get("embedSelector"); +} catch (ClassCastException e) +{ + identifier = UUID.fromString((String) queryInfoMap.get("uuid")); + queryType = (String) queryInfoMap.get("queryType"); + numSelectors = (int) queryInfoMap.get("numSelectors"); + hashBitSize = (int) queryInfoMap.get("hashBitSize"); + hashKey = (String) queryInfoMap.get("hashKey"); + numBitsPerDataElement = (int) queryInfoMap.get("numBitsPerDataElement"); + numPartitionsPerDataElement = (int) queryInfoMap.get("numPartitionsPerDataElement"); + dataPartitionBitSize = (int) queryInfoMap.get("dataPartitionsBitSize"); + useExpLookupTable = (boolean) queryInfoMap.get("useExpLookupTable"); + useHDFSExpLookupTable = (boolean) queryInfoMap.get("useHDFSExpLookupTable"); + embedSelector = (boolean) queryInfoMap.get("embedSelector"); + +} --- End diff -- ?? ?Why r we doing the same thing in try and catch clauses ? > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428680#comment-15428680 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r75537583 --- Diff: src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java --- @@ -151,6 +189,24 @@ public boolean getEmbedSelector() return embedSelector; } + public Map toMap() + { +HashMapqueryInfo = new HashMap (); --- End diff -- Use Map on LHS > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428674#comment-15428674 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r75537022 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java --- @@ -0,0 +1,198 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + ***/ +package org.apache.pirk.responder.wideskies.storm; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.response.wideskies.Response; +import org.apache.pirk.serialization.HadoopFileSystemStore; +import org.apache.pirk.serialization.LocalFileSystemStore; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.math.BigInteger; +import java.net.URI; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +/** + * Bolt to compute and output the final Response object for a query + * + * Receives {@code} tuples, computes the final column product for each colIndex, records the results in the final Response object, and + * outputs the final Response object for the query. + * + * Flush signals are sent to the OuputBolt from the EncColMultBolts via a tuple of the form {@code <-1, 0>}. Once a flush signal has been received from each + * EncColMultBolt (or a timeout is reached), the final column product is computed and the final Response is formed and emitted. + * + * Currently, the Responses are written to HDFS to location specified by the outputFile with the timestamp appended. + * + * TODO: -- Enable other Response output locations + * + */ +public class OutputBolt extends BaseRichBolt +{ + private static final long serialVersionUID = 1L; + + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(OutputBolt.class); + + private OutputCollector outputCollector; + private QueryInfo queryInfo; + private Response response; + private String outputFile; + private boolean hdfs; + private String hdfsUri; + private Integer flushCounter = 0; + private ArrayList tuplesToAck = new ArrayList(); + private Integer totalFlushSigs; + + private LocalFileSystemStore localStore; + private HadoopFileSystemStore hadoopStore; + + // This latch just serves as a hook for testing. + public static CountDownLatch latch = new CountDownLatch(1); + + // This is the main object here. It holds column Id -> product + private HashMap resultsMap = new HashMap (); --- End diff -- Use a Map on LHS > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428663#comment-15428663 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r75536159 --- Diff: pom.xml --- @@ -248,6 +270,64 @@ + + + org.apache.storm + storm-core + ${storm.version} + provided + + + org.apache.logging.log4j + log4j-api + + + org.apache.logging.log4j + log4j-core + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.slf4j + slf4j-log4j12 + + + + + + org.apache.storm + storm-kafka + ${storm.version} + + + + org.apache.kafka + kafka_2.10 + 0.9.0.1 --- End diff -- make this a property ${kafka.version} > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428654#comment-15428654 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r75535198 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java --- @@ -319,4 +314,80 @@ public static void loadCacheFromHDFS(FileSystem fs, String hdfsFileName, Query q return returnPairs; } + + /** + * Method to compute the encrypted row elements for a query from extracted data partitions in the form of ArrayList<> + * + * For each row (as indicated by key = hash(selector)), iterates over the dataPartitions and calculates the column values. + * + * Uses a static LRU cache for the modular exponentiation + * + * Caller is responsible for keeping track of the colIndex and the the maxHitsPerSelector values + * + * Emits {@code Tuple2<>} + */ + public static ArrayList > computeEncRow(ArrayList dataPartitions, Query query, int rowIndex, int colIndex) + throws IOException + { +ArrayList > returnPairs = new ArrayList >(); + +// Pull the corresponding encrypted row query +BigInteger rowQuery = query.getQueryElement(rowIndex); + +// Initialize the column counter +long colCounter = colIndex; + +logger.debug("dataPartitions.size() = " + dataPartitions + " rowIndex = " + rowIndex + " colCounter = " + colCounter); + +// Update the associated column values +for (int i = 0; i < dataPartitions.size(); ++i) +{ + BigInteger part = dataPartitions.get(i); + + BigInteger exp = null; + try + { +exp = expCache.get(new Tuple3 (rowQuery, part, query.getNSquared())); + } catch (ExecutionException e) + { +e.printStackTrace(); + } + + logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter + " part = " + part.toString() + " part binary = " + part.toString(2) + " exp = " + + exp + " i = " + i + " partition = " + dataPartitions.get(i) + " = " + dataPartitions.get(i).toString(2)); + + returnPairs.add(new Tuple2 (colCounter, exp)); + + ++colCounter; +} + +return returnPairs; + } + + public static ArrayList > computeEncRow(BigInteger part, Query query, int rowIndex, int colIndex) throws IOException + { +ArrayList > returnPairs = new ArrayList >(); --- End diff -- Use a List on LHS and as return type of method > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428646#comment-15428646 ] ASF GitHub Bot commented on PIRK-4: --- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-pirk/pull/74#discussion_r75534791 --- Diff: src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java --- @@ -65,6 +66,13 @@ else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("spark" ComputeResponse computeResponse = new ComputeResponse(fs); computeResponse.performQuery(); } +else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("storm")) --- End diff -- we will need to abstract this out to implement this as a Strategy Pattern - the more backends we add like Flink, Spark, Beam and others in the future its only gonna get messy with multi - if-else stmts. For a fun context, let me point u to this code snippet from IBM SystemML, they r now adding Flink support which would be another another if-else block of code - https://gist.github.com/smarthi/eb848e46621b7444924f > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris >Assignee: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428638#comment-15428638 ] ASF GitHub Bot commented on PIRK-4: --- GitHub user clharris opened a pull request: https://github.com/apache/incubator-pirk/pull/74 PIRK-4 [WIP] Add Streaming Implementation for Apache Storm This is an initial implementation for a streaming version of Pirk to run on Apache Storm. I am leaving it temporarily as WIP so people have a chance to look over it and add feedback. Right now there is only one integration test which runs the Storm topology 4 times with each of the different significant configuration possibilities. I wanted to unit test the bolts, but it seems not very straightforward with the way that the Pirk processing works. I'll try to add some documentation (at the minimum a diagram of the Pirk Storm topology) by early next week. You can merge this pull request into a Git repository by running: $ git pull https://github.com/clharris/incubator-pirk pirk-4 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-pirk/pull/74.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #74 commit 577fb33d04ce9061d2e88a27595737f30b9c446d Author: charrisDate: 2016-07-19T14:33:34Z Temporarily using Cloudera pom. commit 41f9042b81a5b683ecbd2c718f98712283acacf9 Author: charris Date: 2016-07-19T14:37:36Z PIRK-4 Initial commit. commit 0c147c36b377c730c7d6489a82b71958e59cdb2d Author: charris Date: 2016-07-19T14:38:27Z Adding toMap method and constructor. commit a30f1a9de6e02dbf55bdc4e771d673b429ac331f Author: charris Date: 2016-07-19T14:39:14Z Adding functionality for streaming processing. commit 8d27e5d3f2ff1fec9f2fffd29052189f567d51f0 Author: charris Date: 2016-07-19T14:40:17Z Broke out method to checkDNSHostNameQueryResults. commit 4df0e31836ec26da00662d92c5eac081c61419b2 Author: charris Date: 2016-07-19T15:04:07Z Added Storm parameters. commit 126361c31426d8285b4202a1134a653dd210dd4b Author: eawilliams Date: 2016-07-22T14:55:28Z initial modifications to storm code commit 9f78a94dec84c7fcb5c09f13ae62598a7700adf4 Author: eawilliams Date: 2016-07-22T14:55:57Z initial modifications to storm code - removing logs commit 4bdd0c1f2461fdf0a1254bd6369db70b05b3e12a Author: eawilliams Date: 2016-07-22T15:52:06Z splitPartitions update to storm implementation commit 180c56161ceffbe7c2d24719f8f9e5071ca632cb Author: eawilliams Date: 2016-07-23T16:44:24Z changed splitPartitions to false in test commit 2520fbe8e0e6eb6a9ee8d259e0a5f10f965e916e Author: charris Date: 2016-07-25T15:02:00Z splitPartitions allows data element partitions to be sent individually. commit e4e69e0119df4c0659a55bee7d6a5c55a016c9ff Author: charris Date: 2016-07-30T19:41:11Z Moved hashing into the spout. commit 3262f0fdd52d62c050c6dc7fb6c6b41c34e99173 Author: charris Date: 2016-07-30T19:41:55Z Added some functionality useful for Storm. commit 290404531eaaa4442ef611252f4d24b2be7a6371 Author: charris Date: 2016-07-30T19:42:12Z Fixed based on refactoring. commit ba81c6814d0f0e01974fb934a0625c44676a316e Author: charris Date: 2016-07-31T02:21:55Z Added Apache headers. commit 987d2f5254e359aa54ec5969935dda124b4d85a1 Author: charris Date: 2016-07-31T20:34:34Z Changed Scala version of Spark and added curator-test. commit 3b617c1b6c5ccdc560a2e32e58815b8732f18e58 Author: charris Date: 2016-07-31T23:17:34Z Merging. commit 22b946cbb361f96f4ecb67fb317d44be1c1b1498 Author: charris Date: 2016-07-31T23:18:24Z Merge branch 'master' of https://github.com/apache/incubator-pirk into storm commit 204495f8a75d38fd0b51a0970352e2204367e6ff Author: eawilliams Date: 2016-08-01T00:34:32Z fixed logging and scala version issues via pom modifications; refactored storm test package commit 8f8113727be5ae7de97adf82d8953abb2efbde63 Author: eawilliams Date: 2016-08-01T01:57:04Z Merging - closes clharris/incubator-pirk#1 commit 1f8b42fdbf2326114f0cbce5fdfb7bd9c53306c3 Author: Chris Harris Date: 2016-08-01T12:22:36Z Merge pull request #1 from ellisonanne/chris-storm fixed logging and scala version issues via pom modifications; refacto… commit a61c9765bff4018041201d3adc7f81038f5499c6 Author: charris
[jira] [Commented] (PIRK-4) Add Streaming Implementation for Apache Storm
[ https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15390109#comment-15390109 ] Suneel Marthi commented on PIRK-4: -- I have an example Beam application that computes Okapi BM25 score for each word in a document of a corpus. See https://en.wikipedia.org/wiki/Okapi_BM25. This code can be executed on both Flink and Spark runners by just setting: options.setRunner(FlinkRunner.class); > Add Streaming Implementation for Apache Storm > - > > Key: PIRK-4 > URL: https://issues.apache.org/jira/browse/PIRK-4 > Project: PIRK > Issue Type: Task > Components: Responder >Reporter: Chris Harris > > Per the Pirk Roadmap, this is a feature to add support for Apache Storm -- This message was sent by Atlassian JIRA (v6.3.4#6332)