[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-15 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174787158
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java
 ---
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class SendToKafka extends TimerTask {
--- End diff --

alright, done and done.


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-14 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174488230
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java
 ---
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class SendToKafka extends TimerTask {
--- End diff --

Yeah, that's a fair point. Yeah, I'd just go ahead with 
`ThreadLocal>` and make the arguments to the 
couple functions that take it `KafkaProducer` and I'm good.


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-14 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174478516
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java
 ---
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class SendToKafka extends TimerTask {
--- End diff --

Ah, so if I'm following you, making the `ThreadLocal` into 
`ThreadLocal>` would solve the issue, correct?  
I'd prefer to not change to`SendToKafka` because it implies that we might 
be sending something other than String in this implementation.  We would not 
because the message generator sends Strings and I'd prefer to not generalize 
that.


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-14 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174468438
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java
 ---
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class SendToKafka extends TimerTask {
--- End diff --

To expand on the reasoning for just making it SendToKafka, right now 
it's using raw KafkaProducer, with one exception.  Since we already know the 
types, we can make it KafkaProducer in the arguments.  I think 
we shouldn't leave it raw, because  passing in a different KafkaProducer is 
going to be incorrect the exception at line 71:
`KafkaProducer producer = kafkaProducer.get();`.

I definitely don't think making the whole thing pluggable is worth doing 
now but making the KafkaProducer's all  for correctness vs just 
making the class  is basically the same amount of effort.


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186826
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java
 ---
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.sampler;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+public class BiasedSampler implements Sampler {
+  TreeMap> discreteDistribution;
+  public BiasedSampler(List>  
discreteDistribution, int max) {
+this.discreteDistribution = createDistribution(discreteDistribution, 
max);
+  }
+
+  public static List> readDistribution(File 
distrFile) throws IOException {
+List> ret = new ArrayList<>();
+System.out.println("Using biased sampler with the following biases:");
+try(BufferedReader br = new BufferedReader(new FileReader(distrFile))) 
{
+  int sumLeft = 0;
+  int sumRight = 0;
+  for(String line = null;(line = br.readLine()) != null;) {
+if(line.startsWith("#")) {
+  continue;
+}
+Iterable it = Splitter.on(",").split(line.trim());
+int left = Integer.parseInt(Iterables.getFirst(it, null));
+int right = Integer.parseInt(Iterables.getLast(it, null));
+System.out.println("\t" + left + "% of templates will comprise 
roughly " + right + "% of sample output");
+ret.add(new AbstractMap.SimpleEntry<>(left, right));
+sumLeft += left;
+sumRight += right;
+  }
+  if(sumLeft > 100 || sumRight > 100 ) {
+throw new IllegalStateException("Neither columns must sum to 
beyond 100.  " +
+"The first column is the % of templates. " +
+"The second column is the % of the sample that % of 
template occupies.");
+  }
+  else if(sumLeft < 100 && sumRight < 100) {
+int left = 100 - sumLeft;
+int right = 100 - sumRight;
+System.out.println("\t" + left + "% of templates will comprise 
roughly " + right + "% of sample output");
+ret.add(new AbstractMap.SimpleEntry<>(left, right));
+  }
+  return ret;
+}
+  }
+
+  private static TreeMap>
+ createDistribution(List>  
discreteDistribution, int max) {
+TreeMap> ret = new TreeMap<>();
+int from = 0;
+double weight = 0.0d;
+for(Map.Entry kv : discreteDistribution) {
+  double pctVals = kv.getKey()/100.0;
+  int to = from + (int)(max*pctVals);
+  double pctWeight = kv.getValue()/100.0;
+  ret.put(weight, new AbstractMap.SimpleEntry<>(from, to));
+  weight += pctWeight;
+  from = to;
+}
+return ret;
+  }
+
+  @Override
+  public int sample(Random rng, int limit) {
+double weight = rng.nextDouble();
+Map.Entry range = 
discreteDistribution.floorEntry(weight).getValue();
+return rng.nextInt(range.getValue() - range.getKey()) + range.getKey();
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186723
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java
 ---
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.sampler;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+public class BiasedSampler implements Sampler {
+  TreeMap> discreteDistribution;
+  public BiasedSampler(List>  
discreteDistribution, int max) {
+this.discreteDistribution = createDistribution(discreteDistribution, 
max);
+  }
+
+  public static List> readDistribution(File 
distrFile) throws IOException {
+List> ret = new ArrayList<>();
+System.out.println("Using biased sampler with the following biases:");
+try(BufferedReader br = new BufferedReader(new FileReader(distrFile))) 
{
+  int sumLeft = 0;
+  int sumRight = 0;
+  for(String line = null;(line = br.readLine()) != null;) {
+if(line.startsWith("#")) {
+  continue;
+}
+Iterable it = Splitter.on(",").split(line.trim());
+int left = Integer.parseInt(Iterables.getFirst(it, null));
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186700
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java
 ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+public class Writer {
+
+  private int summaryLookback;
+  private List summaries = new ArrayList<>();
+  private List writers;
+  private List monitors;
+
+  public Writer(List monitors, int summaryLookback, 
List writers) {
+this.summaryLookback = summaryLookback;
+this.writers = writers;
+this.monitors = monitors;
+for(AbstractMonitor m : monitors) {
+  this.summaries.add(new LinkedList<>());
+}
+  }
+
+  public void writeAll() {
+int i = 0;
+Date dateOf = new Date();
+List results = new ArrayList<>();
+for(AbstractMonitor m : monitors) {
+  Long eps = m.get();
+  if(eps != null) {
+if (summaryLookback > 0) {
+  LinkedList summary = summaries.get(i);
+  addToLookback(eps == null ? Double.NaN : eps.doubleValue(), 
summary);
+  results.add(new Results(m.format(), m.name(), eps, 
Optional.of(getStats(summary;
+}
+else {
+  results.add(new Results(m.format(),m.name(), eps, 
Optional.empty()));
+}
+  }
+  else {
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186800
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/UnbiasedSampler.java
 ---
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.sampler;
+
+import java.util.Random;
+
+public class UnbiasedSampler implements Sampler {
+
+  @Override
+  public int sample(Random rng, int limit) {
+return rng.nextInt(limit);
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186641
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java
 ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+public class Writer {
+
+  private int summaryLookback;
+  private List summaries = new ArrayList<>();
+  private List writers;
+  private List monitors;
+
+  public Writer(List monitors, int summaryLookback, 
List writers) {
+this.summaryLookback = summaryLookback;
+this.writers = writers;
+this.monitors = monitors;
+for(AbstractMonitor m : monitors) {
+  this.summaries.add(new LinkedList<>());
+}
+  }
+
+  public void writeAll() {
+int i = 0;
+Date dateOf = new Date();
+List results = new ArrayList<>();
+for(AbstractMonitor m : monitors) {
+  Long eps = m.get();
+  if(eps != null) {
+if (summaryLookback > 0) {
+  LinkedList summary = summaries.get(i);
+  addToLookback(eps == null ? Double.NaN : eps.doubleValue(), 
summary);
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186597
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java
 ---
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Consumer;
+
+public class ConsoleWriter implements Consumer {
+
+  private String getSummary(DescriptiveStatistics stats) {
+return String.format("Mean: %d, Std Dev: %d", (int)stats.getMean(), 
(int)Math.sqrt(stats.getVariance()));
+  }
+
+  @Override
+  public void accept(Writable writable) {
+List parts = new ArrayList<>();
+Date date = writable.getDate();
+for(Results r : writable.getResults()) {
+  Long eps = r.getEps();
+  if(eps != null) {
+String part = String.format(r.getFormat(), eps);
+if (r.getHistory().isPresent()) {
+  part += " (" + getSummary(r.getHistory().get()) + ")";
+}
+parts.add(part);
+  }
+}
+if(date != null) {
+  DateFormat dateFormat = new SimpleDateFormat("/MM/dd HH:mm:ss");
+  String header = dateFormat.format(date) + " - ";
+  String emptyHeader = "";
+  for (int i = 0; i < header.length(); ++i) {
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186529
  
--- Diff: 
metron-contrib/metron-performance/src/test/java/org/apache/metron/performance/load/SendToKafkaTest.java
 ---
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SendToKafkaTest {
+
+  @Test
+  public void testWritesCorrectNumber() throws InterruptedException {
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186458
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java
 ---
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.utils.cli.CLIOptions;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.cli.OptionHandler;
+
+import javax.annotation.Nullable;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+
+public enum LoadOptions implements CLIOptions {
+  HELP(new OptionHandler() {
+
+@Override
+public String getShortCode() {
+  return "h";
+}
+
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  return new Option(s, "help", false, "Generate Help screen");
+}
+  }),
+  ZK(new OptionHandler() {
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  Option o = new Option(s, "zk_quorum", true, "zookeeper quorum");
+  o.setArgName("QUORUM");
+  o.setRequired(false);
+  return o;
+}
+
+@Override
+public Optional getValue(LoadOptions option, CommandLine cli) {
+  if(option.has(cli)) {
+return Optional.ofNullable(option.get(cli));
+  }
+  else {
+return Optional.empty();
+  }
+}
+
+@Override
+public String getShortCode() {
+  return "z";
+}
+  }),
+  CONSUMER_GROUP(new OptionHandler() {
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  Option o = new Option(s, "consumer_group", true, "Consumer Group.  
The default is " + LoadGenerator.CONSUMER_GROUP);
+  o.setArgName("GROUP_ID");
+  o.setRequired(false);
+  return o;
+}
+
+@Override
+public Optional getValue(LoadOptions option, CommandLine cli) {
+  if(option.has(cli)) {
+return Optional.ofNullable(option.get(cli));
+  }
+  else {
+return Optional.of(LoadGenerator.CONSUMER_GROUP);
+  }
+}
+
+@Override
+public String getShortCode() {
+  return "cg";
+}
+  }),
+  BIASED_SAMPLE(new OptionHandler() {
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  Option o = new Option(s, "sample_bias", true, "The discrete 
distribution to bias the sampling. " +
+  "This is a CSV of 2 columns.  The first column is the % of 
the templates " +
+  "and the 2nd column is the probability (0-100) that it's 
chosen.  For instance:\n" +
+  "  20,80\n" +
+  "  80,20\n" +
+  "implies that 20% of the templates will comprise 80% of the 
output and the remaining 80% of the templates will comprise 20% of the 
output.");
+  o.setArgName("BIAS_FILE");
+  o.setRequired(false);
+  return o;
+}
+
+@Override
+public Optional getValue(LoadOptions option, CommandLine cli) {
+  if(!option.has(cli)) {
+return Optional.empty();
+  }
+  File discreteDistributionFile  = new File(option.get(cli));
+  if(discreteDistributionFile.exists()) {
+try {
+
+  return 

[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186359
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
 ---
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor;
+import 
org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor;
+import org.apache.metron.performance.load.monitor.MonitorTask;
+import org.apache.metron.performance.load.monitor.writers.CSVWriter;
+import org.apache.metron.performance.load.monitor.writers.ConsoleWriter;
+import org.apache.metron.performance.load.monitor.writers.Writable;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.performance.sampler.Sampler;
+import org.apache.metron.performance.sampler.UnbiasedSampler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class LoadGenerator
+{
+  public static String CONSUMER_GROUP = "load.group";
+  public static long SEND_PERIOD_MS = 100;
+  public static long MONITOR_PERIOD_MS = 1000*10;
+  private static ExecutorService pool;
+  private static ThreadLocal kafkaProducer;
+  public static AtomicLong numSent = new AtomicLong(0);
+
+  public static void main( String[] args ) throws Exception {
+CommandLine cli = LoadOptions.parse(new PosixParser(), args);
+EnumMap evaluatedArgs = 
LoadOptions.createConfig(cli);
+Map kafkaConfig = new HashMap<>();
+kafkaConfig.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+kafkaConfig.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+kafkaConfig.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+kafkaConfig.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+if(LoadOptions.ZK.has(cli)) {
+  String zkQuorum = (String) evaluatedArgs.get(LoadOptions.ZK).get();
+  kafkaConfig.put("bootstrap.servers", 
Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum)));
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186403
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
 ---
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor;
+import 
org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor;
+import org.apache.metron.performance.load.monitor.MonitorTask;
+import org.apache.metron.performance.load.monitor.writers.CSVWriter;
+import org.apache.metron.performance.load.monitor.writers.ConsoleWriter;
+import org.apache.metron.performance.load.monitor.writers.Writable;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.performance.sampler.Sampler;
+import org.apache.metron.performance.sampler.UnbiasedSampler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class LoadGenerator
+{
+  public static String CONSUMER_GROUP = "load.group";
+  public static long SEND_PERIOD_MS = 100;
+  public static long MONITOR_PERIOD_MS = 1000*10;
+  private static ExecutorService pool;
+  private static ThreadLocal kafkaProducer;
+  public static AtomicLong numSent = new AtomicLong(0);
+
+  public static void main( String[] args ) throws Exception {
+CommandLine cli = LoadOptions.parse(new PosixParser(), args);
+EnumMap evaluatedArgs = 
LoadOptions.createConfig(cli);
+Map kafkaConfig = new HashMap<>();
+kafkaConfig.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+kafkaConfig.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+kafkaConfig.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+kafkaConfig.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+if(LoadOptions.ZK.has(cli)) {
+  String zkQuorum = (String) evaluatedArgs.get(LoadOptions.ZK).get();
+  kafkaConfig.put("bootstrap.servers", 
Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum)));
+}
+String groupId = 
evaluatedArgs.get(LoadOptions.CONSUMER_GROUP).get().toString();
+System.out.println("Consumer Group: " + groupId);
+kafkaConfig.put("group.id", groupId);
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186422
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java
 ---
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.utils.cli.CLIOptions;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.cli.OptionHandler;
+
+import javax.annotation.Nullable;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+
+public enum LoadOptions implements CLIOptions {
+  HELP(new OptionHandler() {
+
+@Override
+public String getShortCode() {
+  return "h";
+}
+
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  return new Option(s, "help", false, "Generate Help screen");
+}
+  }),
+  ZK(new OptionHandler() {
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  Option o = new Option(s, "zk_quorum", true, "zookeeper quorum");
+  o.setArgName("QUORUM");
+  o.setRequired(false);
+  return o;
+}
+
+@Override
+public Optional getValue(LoadOptions option, CommandLine cli) {
+  if(option.has(cli)) {
+return Optional.ofNullable(option.get(cli));
+  }
+  else {
+return Optional.empty();
+  }
+}
+
+@Override
+public String getShortCode() {
+  return "z";
+}
+  }),
+  CONSUMER_GROUP(new OptionHandler() {
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  Option o = new Option(s, "consumer_group", true, "Consumer Group.  
The default is " + LoadGenerator.CONSUMER_GROUP);
+  o.setArgName("GROUP_ID");
+  o.setRequired(false);
+  return o;
+}
+
+@Override
+public Optional getValue(LoadOptions option, CommandLine cli) {
+  if(option.has(cli)) {
+return Optional.ofNullable(option.get(cli));
+  }
+  else {
+return Optional.of(LoadGenerator.CONSUMER_GROUP);
+  }
+}
+
+@Override
+public String getShortCode() {
+  return "cg";
+}
+  }),
+  BIASED_SAMPLE(new OptionHandler() {
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  Option o = new Option(s, "sample_bias", true, "The discrete 
distribution to bias the sampling. " +
+  "This is a CSV of 2 columns.  The first column is the % of 
the templates " +
+  "and the 2nd column is the probability (0-100) that it's 
chosen.  For instance:\n" +
+  "  20,80\n" +
+  "  80,20\n" +
+  "implies that 20% of the templates will comprise 80% of the 
output and the remaining 80% of the templates will comprise 20% of the 
output.");
+  o.setArgName("BIAS_FILE");
+  o.setRequired(false);
+  return o;
+}
+
+@Override
+public Optional getValue(LoadOptions option, CommandLine cli) {
+  if(!option.has(cli)) {
+return Optional.empty();
+  }
+  File discreteDistributionFile  = new File(option.get(cli));
+  if(discreteDistributionFile.exists()) {
+try {
+
+  return 

[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186490
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java
 ---
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.metron.performance.sampler.Sampler;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class MessageGenerator implements Supplier {
+  private static ThreadLocal rng = ThreadLocal.withInitial(() -> 
new Random());
+  private static AtomicLong guidOffset = new AtomicLong(0);
+  private static String guidPrefix = "6141faf6-a8ba-4044-ab80-";
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186316
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
 ---
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor;
+import 
org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor;
+import org.apache.metron.performance.load.monitor.MonitorTask;
+import org.apache.metron.performance.load.monitor.writers.CSVWriter;
+import org.apache.metron.performance.load.monitor.writers.ConsoleWriter;
+import org.apache.metron.performance.load.monitor.writers.Writable;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.performance.sampler.Sampler;
+import org.apache.metron.performance.sampler.UnbiasedSampler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class LoadGenerator
+{
+  public static String CONSUMER_GROUP = "load.group";
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186275
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
 ---
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor;
+import 
org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor;
+import org.apache.metron.performance.load.monitor.MonitorTask;
+import org.apache.metron.performance.load.monitor.writers.CSVWriter;
+import org.apache.metron.performance.load.monitor.writers.ConsoleWriter;
+import org.apache.metron.performance.load.monitor.writers.Writable;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.performance.sampler.Sampler;
+import org.apache.metron.performance.sampler.UnbiasedSampler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class LoadGenerator
+{
+  public static String CONSUMER_GROUP = "load.group";
+  public static long SEND_PERIOD_MS = 100;
+  public static long MONITOR_PERIOD_MS = 1000*10;
+  private static ExecutorService pool;
+  private static ThreadLocal kafkaProducer;
+  public static AtomicLong numSent = new AtomicLong(0);
+
+  public static void main( String[] args ) throws Exception {
+CommandLine cli = LoadOptions.parse(new PosixParser(), args);
+EnumMap evaluatedArgs = 
LoadOptions.createConfig(cli);
+Map kafkaConfig = new HashMap<>();
+kafkaConfig.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174181759
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java
 ---
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class SendToKafka extends TimerTask {
--- End diff --

well, I mean, I suppose it could be, but I didn't see value in it since all 
of our inputs currently are strings and the message generator exclusively 
generates strings, not other messages.  It'd be a more serious refactoring to 
genericize this to make the message generator pluggable.  Also, for utility 
code, I think I'd rather avoid prematurely generalizing it.  Let me know what 
you think.


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173943133
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/AbstractMonitor.java
 ---
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+public abstract class AbstractMonitor implements Supplier, 
MonitorNaming {
+  private static final double EPSILON = 1e-6;
+  protected Optional kafkaTopic;
+  protected long timestampPrevious = 0;
+  public AbstractMonitor(Optional kafkaTopic) {
+this.kafkaTopic = kafkaTopic;
+  }
+
+  protected abstract Long monitor(double deltaTs);
+
+  @Override
+  public Long get() {
+long timeStarted = System.currentTimeMillis();
--- End diff --

Nope, I'm missing my common sense is all.


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173942355
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/AbstractMonitor.java
 ---
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+public abstract class AbstractMonitor implements Supplier, 
MonitorNaming {
+  private static final double EPSILON = 1e-6;
+  protected Optional kafkaTopic;
+  protected long timestampPrevious = 0;
+  public AbstractMonitor(Optional kafkaTopic) {
+this.kafkaTopic = kafkaTopic;
+  }
+
+  protected abstract Long monitor(double deltaTs);
+
+  @Override
+  public Long get() {
+long timeStarted = System.currentTimeMillis();
--- End diff --

To my knowledge, `currentTimeMillis()` is not affected by DST as it is the 
milliseconds since unix epoch UTC.  Cursory googling seems to hold up this 
assertion.  Did I miss something?


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173921179
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java
 ---
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.utils.cli.CLIOptions;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.cli.OptionHandler;
+
+import javax.annotation.Nullable;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+
+public enum LoadOptions implements CLIOptions {
+  HELP(new OptionHandler() {
+
+@Override
+public String getShortCode() {
+  return "h";
+}
+
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  return new Option(s, "help", false, "Generate Help screen");
+}
+  }),
+  ZK(new OptionHandler() {
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  Option o = new Option(s, "zk_quorum", true, "zookeeper quorum");
+  o.setArgName("QUORUM");
+  o.setRequired(false);
+  return o;
+}
+
+@Override
+public Optional getValue(LoadOptions option, CommandLine cli) {
+  if(option.has(cli)) {
+return Optional.ofNullable(option.get(cli));
+  }
+  else {
+return Optional.empty();
+  }
+}
+
+@Override
+public String getShortCode() {
+  return "z";
+}
+  }),
+  CONSUMER_GROUP(new OptionHandler() {
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  Option o = new Option(s, "consumer_group", true, "Consumer Group.  
The default is " + LoadGenerator.CONSUMER_GROUP);
+  o.setArgName("GROUP_ID");
+  o.setRequired(false);
+  return o;
+}
+
+@Override
+public Optional getValue(LoadOptions option, CommandLine cli) {
+  if(option.has(cli)) {
+return Optional.ofNullable(option.get(cli));
+  }
+  else {
+return Optional.of(LoadGenerator.CONSUMER_GROUP);
+  }
+}
+
+@Override
+public String getShortCode() {
+  return "cg";
+}
+  }),
+  BIASED_SAMPLE(new OptionHandler() {
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  Option o = new Option(s, "sample_bias", true, "The discrete 
distribution to bias the sampling. " +
+  "This is a CSV of 2 columns.  The first column is the % of 
the templates " +
+  "and the 2nd column is the probability (0-100) that it's 
chosen.  For instance:\n" +
+  "  20,80\n" +
+  "  80,20\n" +
+  "implies that 20% of the templates will comprise 80% of the 
output and the remaining 80% of the templates will comprise 20% of the 
output.");
+  o.setArgName("BIAS_FILE");
+  o.setRequired(false);
+  return o;
+}
+
+@Override
+public Optional getValue(LoadOptions option, CommandLine cli) {
+  if(!option.has(cli)) {
+return Optional.empty();
+  }
+  File discreteDistributionFile  = new File(option.get(cli));
+  if(discreteDistributionFile.exists()) {
+try {
+
+  return 

[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173928192
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java
 ---
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Consumer;
+
+public class ConsoleWriter implements Consumer {
+
+  private String getSummary(DescriptiveStatistics stats) {
+return String.format("Mean: %d, Std Dev: %d", (int)stats.getMean(), 
(int)Math.sqrt(stats.getVariance()));
+  }
+
+  @Override
+  public void accept(Writable writable) {
+List parts = new ArrayList<>();
+Date date = writable.getDate();
+for(Results r : writable.getResults()) {
+  Long eps = r.getEps();
+  if(eps != null) {
+String part = String.format(r.getFormat(), eps);
+if (r.getHistory().isPresent()) {
+  part += " (" + getSummary(r.getHistory().get()) + ")";
+}
+parts.add(part);
+  }
+}
+if(date != null) {
+  DateFormat dateFormat = new SimpleDateFormat("/MM/dd HH:mm:ss");
+  String header = dateFormat.format(date) + " - ";
+  String emptyHeader = "";
+  for (int i = 0; i < header.length(); ++i) {
--- End diff --

Could use `StringUtils.rightPad` to avoid the awkward append blanks loop.


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173918861
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
 ---
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor;
+import 
org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor;
+import org.apache.metron.performance.load.monitor.MonitorTask;
+import org.apache.metron.performance.load.monitor.writers.CSVWriter;
+import org.apache.metron.performance.load.monitor.writers.ConsoleWriter;
+import org.apache.metron.performance.load.monitor.writers.Writable;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.performance.sampler.Sampler;
+import org.apache.metron.performance.sampler.UnbiasedSampler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class LoadGenerator
+{
+  public static String CONSUMER_GROUP = "load.group";
+  public static long SEND_PERIOD_MS = 100;
+  public static long MONITOR_PERIOD_MS = 1000*10;
+  private static ExecutorService pool;
+  private static ThreadLocal kafkaProducer;
+  public static AtomicLong numSent = new AtomicLong(0);
+
+  public static void main( String[] args ) throws Exception {
+CommandLine cli = LoadOptions.parse(new PosixParser(), args);
+EnumMap evaluatedArgs = 
LoadOptions.createConfig(cli);
+Map kafkaConfig = new HashMap<>();
+kafkaConfig.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+kafkaConfig.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+kafkaConfig.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+kafkaConfig.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+if(LoadOptions.ZK.has(cli)) {
+  String zkQuorum = (String) evaluatedArgs.get(LoadOptions.ZK).get();
+  kafkaConfig.put("bootstrap.servers", 
Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum)));
--- End diff --

Same as before: ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG





---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173936212
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java
 ---
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.sampler;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+public class BiasedSampler implements Sampler {
+  TreeMap> discreteDistribution;
+  public BiasedSampler(List>  
discreteDistribution, int max) {
+this.discreteDistribution = createDistribution(discreteDistribution, 
max);
+  }
+
+  public static List> readDistribution(File 
distrFile) throws IOException {
+List> ret = new ArrayList<>();
+System.out.println("Using biased sampler with the following biases:");
+try(BufferedReader br = new BufferedReader(new FileReader(distrFile))) 
{
+  int sumLeft = 0;
+  int sumRight = 0;
+  for(String line = null;(line = br.readLine()) != null;) {
+if(line.startsWith("#")) {
+  continue;
+}
+Iterable it = Splitter.on(",").split(line.trim());
+int left = Integer.parseInt(Iterables.getFirst(it, null));
+int right = Integer.parseInt(Iterables.getLast(it, null));
+System.out.println("\t" + left + "% of templates will comprise 
roughly " + right + "% of sample output");
+ret.add(new AbstractMap.SimpleEntry<>(left, right));
+sumLeft += left;
+sumRight += right;
+  }
+  if(sumLeft > 100 || sumRight > 100 ) {
+throw new IllegalStateException("Neither columns must sum to 
beyond 100.  " +
+"The first column is the % of templates. " +
+"The second column is the % of the sample that % of 
template occupies.");
+  }
+  else if(sumLeft < 100 && sumRight < 100) {
+int left = 100 - sumLeft;
+int right = 100 - sumRight;
+System.out.println("\t" + left + "% of templates will comprise 
roughly " + right + "% of sample output");
+ret.add(new AbstractMap.SimpleEntry<>(left, right));
+  }
+  return ret;
+}
+  }
+
+  private static TreeMap>
+ createDistribution(List>  
discreteDistribution, int max) {
+TreeMap> ret = new TreeMap<>();
+int from = 0;
+double weight = 0.0d;
+for(Map.Entry kv : discreteDistribution) {
+  double pctVals = kv.getKey()/100.0;
+  int to = from + (int)(max*pctVals);
+  double pctWeight = kv.getValue()/100.0;
+  ret.put(weight, new AbstractMap.SimpleEntry<>(from, to));
+  weight += pctWeight;
+  from = to;
+}
+return ret;
+  }
+
+  @Override
+  public int sample(Random rng, int limit) {
+double weight = rng.nextDouble();
+Map.Entry range = 
discreteDistribution.floorEntry(weight).getValue();
+return rng.nextInt(range.getValue() - range.getKey()) + range.getKey();
--- End diff --

Jerks like me who provide negatives or zeroes cause ugly exceptions to 
start showing up. because the nextInt doesn't get happy values.


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173928769
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java
 ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+public class Writer {
+
+  private int summaryLookback;
+  private List summaries = new ArrayList<>();
+  private List writers;
+  private List monitors;
+
+  public Writer(List monitors, int summaryLookback, 
List writers) {
+this.summaryLookback = summaryLookback;
+this.writers = writers;
+this.monitors = monitors;
+for(AbstractMonitor m : monitors) {
+  this.summaries.add(new LinkedList<>());
+}
+  }
+
+  public void writeAll() {
+int i = 0;
+Date dateOf = new Date();
+List results = new ArrayList<>();
+for(AbstractMonitor m : monitors) {
+  Long eps = m.get();
+  if(eps != null) {
+if (summaryLookback > 0) {
+  LinkedList summary = summaries.get(i);
+  addToLookback(eps == null ? Double.NaN : eps.doubleValue(), 
summary);
--- End diff --

eps is never null here, so this can be just 
`addToLookback(eps.doubleValue(), summary);`


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173926217
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/AbstractMonitor.java
 ---
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+public abstract class AbstractMonitor implements Supplier, 
MonitorNaming {
+  private static final double EPSILON = 1e-6;
+  protected Optional kafkaTopic;
+  protected long timestampPrevious = 0;
+  public AbstractMonitor(Optional kafkaTopic) {
+this.kafkaTopic = kafkaTopic;
+  }
+
+  protected abstract Long monitor(double deltaTs);
+
+  @Override
+  public Long get() {
+long timeStarted = System.currentTimeMillis();
--- End diff --

I'd probably use `System.nanoTime` here (if it's not going to cause perf 
issues).  `System.currentTimeMillis` is, iirc, subject entirely to the system 
clock, which can be hit with daylight savings time and such.


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173929511
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java
 ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+public class Writer {
+
+  private int summaryLookback;
+  private List summaries = new ArrayList<>();
+  private List writers;
+  private List monitors;
+
+  public Writer(List monitors, int summaryLookback, 
List writers) {
+this.summaryLookback = summaryLookback;
+this.writers = writers;
+this.monitors = monitors;
+for(AbstractMonitor m : monitors) {
+  this.summaries.add(new LinkedList<>());
+}
+  }
+
+  public void writeAll() {
+int i = 0;
+Date dateOf = new Date();
+List results = new ArrayList<>();
+for(AbstractMonitor m : monitors) {
+  Long eps = m.get();
+  if(eps != null) {
+if (summaryLookback > 0) {
+  LinkedList summary = summaries.get(i);
+  addToLookback(eps == null ? Double.NaN : eps.doubleValue(), 
summary);
+  results.add(new Results(m.format(), m.name(), eps, 
Optional.of(getStats(summary;
+}
+else {
+  results.add(new Results(m.format(),m.name(), eps, 
Optional.empty()));
+}
+  }
+  else {
--- End diff --

Since the two else's are the same, I think this should simplify down to 
(with the above change):
```
  if (eps != null && summaryLookback > 0) {
LinkedList summary = summaries.get(i);
addToLookback(eps.doubleValue(), summary);
results.add(new Results(m.format(), m.name(), eps, 
Optional.of(getStats(summary;
  } else {
results.add(new Results(m.format(), m.name(), eps, 
Optional.empty()));
  }
```


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173933259
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/UnbiasedSampler.java
 ---
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.sampler;
+
+import java.util.Random;
+
+public class UnbiasedSampler implements Sampler {
+
+  @Override
+  public int sample(Random rng, int limit) {
+return rng.nextInt(limit);
--- End diff --

If the templates file is empty, there will be no bounds here, and this will 
start puking exceptions out.


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173930965
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java
 ---
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.sampler;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+public class BiasedSampler implements Sampler {
+  TreeMap> discreteDistribution;
+  public BiasedSampler(List>  
discreteDistribution, int max) {
+this.discreteDistribution = createDistribution(discreteDistribution, 
max);
+  }
+
+  public static List> readDistribution(File 
distrFile) throws IOException {
+List> ret = new ArrayList<>();
+System.out.println("Using biased sampler with the following biases:");
+try(BufferedReader br = new BufferedReader(new FileReader(distrFile))) 
{
+  int sumLeft = 0;
+  int sumRight = 0;
+  for(String line = null;(line = br.readLine()) != null;) {
+if(line.startsWith("#")) {
+  continue;
+}
+Iterable it = Splitter.on(",").split(line.trim());
+int left = Integer.parseInt(Iterables.getFirst(it, null));
--- End diff --

I wouldn't expect it to happen, but left/right will throw a 
NumberFormatException if the iterables list is empty. Do we need/want to be 
more robust about it?


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173917605
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
 ---
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor;
+import 
org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor;
+import org.apache.metron.performance.load.monitor.MonitorTask;
+import org.apache.metron.performance.load.monitor.writers.CSVWriter;
+import org.apache.metron.performance.load.monitor.writers.ConsoleWriter;
+import org.apache.metron.performance.load.monitor.writers.Writable;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.performance.sampler.Sampler;
+import org.apache.metron.performance.sampler.UnbiasedSampler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class LoadGenerator
+{
+  public static String CONSUMER_GROUP = "load.group";
--- End diff --

Should this maybe be "metron.load.group" by default?  This is a little 
generic, and even though it's not terribly likely, could collide with someone 
else on the Kafka cluster.


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173917240
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
 ---
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor;
+import 
org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor;
+import org.apache.metron.performance.load.monitor.MonitorTask;
+import org.apache.metron.performance.load.monitor.writers.CSVWriter;
+import org.apache.metron.performance.load.monitor.writers.ConsoleWriter;
+import org.apache.metron.performance.load.monitor.writers.Writable;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.performance.sampler.Sampler;
+import org.apache.metron.performance.sampler.UnbiasedSampler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class LoadGenerator
+{
+  public static String CONSUMER_GROUP = "load.group";
+  public static long SEND_PERIOD_MS = 100;
+  public static long MONITOR_PERIOD_MS = 1000*10;
+  private static ExecutorService pool;
+  private static ThreadLocal kafkaProducer;
+  public static AtomicLong numSent = new AtomicLong(0);
+
+  public static void main( String[] args ) throws Exception {
+CommandLine cli = LoadOptions.parse(new PosixParser(), args);
+EnumMap evaluatedArgs = 
LoadOptions.createConfig(cli);
+Map kafkaConfig = new HashMap<>();
+kafkaConfig.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
--- End diff --

Kafka exposes these constants, rather than having to have them hardcoded.

ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173918976
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
 ---
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor;
+import 
org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor;
+import org.apache.metron.performance.load.monitor.MonitorTask;
+import org.apache.metron.performance.load.monitor.writers.CSVWriter;
+import org.apache.metron.performance.load.monitor.writers.ConsoleWriter;
+import org.apache.metron.performance.load.monitor.writers.Writable;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.performance.sampler.Sampler;
+import org.apache.metron.performance.sampler.UnbiasedSampler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class LoadGenerator
+{
+  public static String CONSUMER_GROUP = "load.group";
+  public static long SEND_PERIOD_MS = 100;
+  public static long MONITOR_PERIOD_MS = 1000*10;
+  private static ExecutorService pool;
+  private static ThreadLocal kafkaProducer;
+  public static AtomicLong numSent = new AtomicLong(0);
+
+  public static void main( String[] args ) throws Exception {
+CommandLine cli = LoadOptions.parse(new PosixParser(), args);
+EnumMap evaluatedArgs = 
LoadOptions.createConfig(cli);
+Map kafkaConfig = new HashMap<>();
+kafkaConfig.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+kafkaConfig.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+kafkaConfig.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+kafkaConfig.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+if(LoadOptions.ZK.has(cli)) {
+  String zkQuorum = (String) evaluatedArgs.get(LoadOptions.ZK).get();
+  kafkaConfig.put("bootstrap.servers", 
Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum)));
+}
+String groupId = 
evaluatedArgs.get(LoadOptions.CONSUMER_GROUP).get().toString();
+System.out.println("Consumer Group: " + groupId);
+kafkaConfig.put("group.id", groupId);
--- End diff --

Same as before: ConsumerGroup.GROUP_ID_CONFIG


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173925390
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java
 ---
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class SendToKafka extends TimerTask {
--- End diff --

This class feels like it should be genericized as SendToKafka where 
it takes args as appropriate.  Right now this is just basically a 
SendToKafka, correct?


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173922548
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java
 ---
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.metron.performance.sampler.Sampler;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class MessageGenerator implements Supplier {
+  private static ThreadLocal rng = ThreadLocal.withInitial(() -> 
new Random());
+  private static AtomicLong guidOffset = new AtomicLong(0);
+  private static String guidPrefix = "6141faf6-a8ba-4044-ab80-";
--- End diff --

I assume this is being done to avoid the cost of creating an actual UUID?

Why bother though? Why not just use the guidOffset directly? Is it to be at 
least somewhat close in terms of data size / format? In that case, why not just 
have the prefix be all zeroes or something easier to quickly read at a glance?


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-12 Thread justinleet
Github user justinleet commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r173924485
  
--- Diff: 
metron-contrib/metron-performance/src/test/java/org/apache/metron/performance/load/SendToKafkaTest.java
 ---
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SendToKafkaTest {
+
+  @Test
+  public void testWritesCorrectNumber() throws InterruptedException {
--- End diff --

InterruptedException isn't thrown here, is it?


---