Re: ValueState is missing

2016-08-11 Thread Dong-iL, Kim
in my code, is the config of ExecutionEnv alright?


> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim  wrote:
> 
> 
> my code and log is as below.
> 
> 
>val getExecuteEnv: StreamExecutionEnvironment = {
>val env = 
> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(1)
>env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>env.getCheckpointConfig.setCheckpointTimeout(6)
>env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 3))
>env
>}
> 
> def transform(target: DataStream[(String, String, String, String, Long)]): 
> DataStream[WinLossBase] =
>target.keyBy(_._3).flatMap(new StateOperator)
> 
> def main(args: Array[String]) {
>val env = getExecuteEnv
>val source: DataStream[String] = 
> extractFromKafka(env).name("KafkaSource")
>val json = deserializeToJsonObj(source).name("ConvertToJson")
>val target: DataStream[(String, String, String, String, Long)] = 
> preTransform(json)
>val result: DataStream[WinLossBase] = 
> transform(target).name("ToKeyedStream”)
> …
> }
> 
> class StateOperator extends RichFlatMapFunction[(String, String, String, 
> String, Long), WinLossBase] {
>var playerState: ValueState[util.Map[String, PotPlayer]] = _
>var handState: ValueState[HandHistoryInfo] = _
> 
>override def open(param: Configuration): Unit = {
>val playerValueStateDescriptor = new 
> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, 
> PotPlayer]())
>playerState = 
> getRuntimeContext.getState(playerValueStateDescriptor)
>handState = getRuntimeContext.getState(new 
> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>}
> 
>override def flatMap(in: (String, String, String, String, Long), out: 
> Collector[WinLossBase]): Unit = {
>in._2 match {
>case "GameStartHistory" =>
>val players = playerState.value()
>val obj = _convertJsonToRecord(in._4, 
> classOf[GameStartHistoryRecord])
>val record = obj.asInstanceOf[GameStartHistoryRecord]
>val handHistoryInfo: HandHistoryInfo = 
> _setUpHandHistoryInfo(record)
>if (LOG.isInfoEnabled())
>LOG.info("hand start {}", if (handHistoryInfo != null) 
> handHistoryInfo.handHistoryId else "NULL”)
>  ….
>playerState.update(players)
>handState.update(handHistoryInfo)
>case "HoleCardHistory" =>
>val players = playerState.value()
>if (players != null) {
>   ...
> playerState.update(players)
>} else LOG.warn("there is no player[hole card]. {}", in._4)
>case "PlayerStateHistory" =>
>val players = playerState.value()
>if (players != null) {
>   ….
>playerState.update(players)
>} else LOG.warn("there is no player[player state]. {}", 
> in._4)
>case "CommCardHistory" =>
>val handHistoryInfo = handState.value()
>val commCardHistory: CommCardHistory = 
> commCardState.value()
>if (handHistoryInfo != null) {
>   ...
>handState.update(handHistoryInfo)
>commCardState.update(commCardHistory)
>} else LOG.warn("there is no handhistory info[comm card]. 
> {}", in._4)
>case "PlayerActionHistory" =>
>val handHistoryInfo = handState.value()
>val players = playerState.value()
> 
>if (handHistoryInfo != null) {
>   ...
>} else LOG.warn("there is no handhistory info[player 
> action]. {}", in._4)
>case "PotHistory" =>
>val players = playerState.value()
>val handHistoryInfo = handState.value()
>val commCardHistory: CommCardHistory = 
> commCardState.value()
>if (handHistoryInfo != null && handHistoryInfo.playType == 
> PlayType.Cash && players != null && players.size > 1) {
>...
>} else LOG.warn("there is no handhistory info[pot]. {}", 
> in._4)
>case "GameEndHistory" =>
>val players = playerState.value()
>val handHistoryInfo = handState.value()
>   ...
>if 

Re: Does Flink DataStreams using combiners?

2016-08-11 Thread Sameer W
Sorry I mean streaming cannot use combiners (repeated below)
---
Streaming cannot use combiners. The aggregations happen on the trigger.

The elements being aggregated are only known after the trigger delivers the
elements to the evaluation function.

Since windows can overlap and even assignment to a window is not done until
the elements arrive at the sum operator in your case, combiner cannot know
what to pre aggregate even if were available.

On Thu, Aug 11, 2016 at 9:22 PM, Sameer Wadkar  wrote:

> Streaming cannot use windows. The aggregations happen on the trigger.
>
> The elements being aggregated are only known after the trigger delivers
> the elements to the evaluation function.
>
> Since windows can overlap and even assignment to a window is not done
> until the elements arrive at the sum operator in your case, combiner cannot
> know what to pre aggregate even if were available.
>
>
>
> > On Aug 11, 2016, at 8:51 PM, Elias Levy 
> wrote:
> >
> > I am wondering if Flink makes use of combiners to pre-reduce a keyed and
> windowed stream before shuffling the data among workers.
> >
> > I.e. will it use a combiner in something like:
> >
> > stream.flatMap {...}
> >   .assignTimestampsAndWatermarks(...)
> >   .keyBy(...)
> >   .timeWindow(...)
> >   .trigger(...)
> >   .sum("cnt")
> >
> > or will it shuffle the keyed input before the sum reduction?
> >
> > If it does make use of combiners, it would be useful to point this out
> in the documentation, particularly if it only applies to certain types of
> reducers, folds, etc.
>


RE: flink - Working with State example

2016-08-11 Thread Ramanan, Buvana (Nokia - US)
Kostas,
Good catch! That makes it working! Thank you so much for the help.
Regards,
Buvana

-Original Message-
From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] 
Sent: Thursday, August 11, 2016 11:22 AM
To: user@flink.apache.org
Subject: Re: flink - Working with State example

Hi Buvana, 

At a first glance, your snapshotState() should return a Double.

Kostas

> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) 
>  wrote:
> 
> Thank you Kostas & Ufuk. I get into the following compilation error when I 
> use checkpointed interface. Pasting the code & message as follows:
> 
> Is the Serializable definition supposed to be from java.io.Serializable or 
> somewhere else?
> 
> Thanks again,
> Buvana
> 
> ==
> ==
> Code:
> 
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> 
> import java.io.Serializable;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
> import 
> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.apache.flink.util.Collector;
> 
> import java.util.Properties;
> 
> /**
> * Created by buvana on 8/9/16.
> */
> public class stateful {
>private static String INPUT_KAFKA_TOPIC = null;
> ---
> --- skipping the main as it’s the same as before except for class name 
> change -
> ---
>   public static class MapStateful extends RichFlatMapFunction Tuple2>
>implements Checkpointed {
> 
>private Double prev_tuple = null;
> 
>@Override
>public void flatMap(String incString, Collector Double>> out) {
>try {
>Double value = Double.parseDouble(incString);
>System.out.println("value = " + value);
>System.out.println(prev_tuple);
> 
>Double value2 = value - prev_tuple;
>prev_tuple = value;
> 
>Tuple2 tp = new Tuple2();
>tp.setField(INPUT_KAFKA_TOPIC, 0);
>tp.setField(value2, 1);
>out.collect(tp);
>} catch (NumberFormatException e) {
>System.out.println("Could not convert to Float" + incString);
>System.err.println("Could not convert to Float" + incString);
>}
>}
>@Override
>public void open(Configuration config) {
>if (prev_tuple == null) {
>// only recreate if null
>// restoreState will be called before open()
>// so this will already set the sum to the restored value
>prev_tuple = new Double("0.0");
>}
>}
> 
>@Override
>public Serializable snapshotState(
>long checkpointId,
>long checkpointTimestamp) throws Exception {
>return prev_tuple;
>}
> 
> 
>@Override
>public void restoreState(Double state) {
>prev_tuple = state;
>}
>}
> }
> ==
> =
> ERROR message while building:
> 
> $ mvn clean package
> [INFO] Scanning for projects...
> [INFO]
>  
> [INFO] 
> --
> -- [INFO] Building Flink Quickstart Job 0.1 [INFO] 
> --
> -- [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has 
> been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] --- 
> maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO] 
> Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
> [INFO]
> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ 
> wiki-edits --- [INFO] Using 'UTF-8' encoding to copy filtered resources.
> [INFO] Copying 1 resource
> [INFO]
> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ 
> wiki-edits --- [INFO] Changes detected - recompiling the module!
> [INFO] Compiling 7 source files to 
> 

Does Flink DataStreams using combiners?

2016-08-11 Thread Elias Levy
I am wondering if Flink makes use of combiners to pre-reduce a keyed and
windowed stream before shuffling the data among workers.

I.e. will it use a combiner in something like:

stream.flatMap {...}
  .assignTimestampsAndWatermarks(...)
  .keyBy(...)
  .timeWindow(...)
  .trigger(...)
  .sum("cnt")

or will it shuffle the keyed input before the sum reduction?

If it does make use of combiners, it would be useful to point this out in
the documentation, particularly if it only applies to certain types of
reducers, folds, etc.


Re: Is java.sql.Timestamp fully suported in Flink SQL?

2016-08-11 Thread Timo Walther

Hi Davran,

unfortunately, you found a bug. I created an issue for it ( 
https://issues.apache.org/jira/browse/FLINK-4385). You could convert the 
timestamp to a long value as a workaround.


Table table1 = tableEnv.fromDataSet(dataSet1);
Table table2 = tableEnv.fromDataSet(dataSet2);
Table table = 
table1.select("t.cast(LONG)").union(table2.select("t.cast(LONG)"));


I hope that helps. Sorry, for the inconvenience.

Timo


Am 11/08/16 um 18:28 schrieb Davran Muzafarov:


I have two tables created from data sets:

List infos0 = .

List infos1 = .

DataSet dataSet0 = env.fromCollection( infos0 );

DataSet dataSet1 = env.fromCollection( infos1 );

tableEnv.registerDataSet( "table0", dataSet0 );

tableEnv.registerDataSet( "table1", dataSet1 );

Table table = tableEnv.sql( "select * from table0 union select * from 
table1" );


DataSet redyData = tableEnv.toDataSet( table, Row.class );

If “MarketDataInfo” have only String, Floats or Integers fields 
"toDataSet" works.


If MarketDataInfo has Timestamp, I am getting:

Internal error: Error occurred while applying rule DataSetAggregateRule

at org.apache.calcite.util.Util.newInternal(Util.java:792)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)


at 
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225)


at 
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:118)


at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:214)


at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:825)


at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334)


at 
org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:253)


at 
org.apache.flink.api.java.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146)


...

Caused by: org.apache.flink.api.table.TableException: Unsupported data 
type encountered


at 
org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRowSize$2.apply(DataSetRel.scala:65)


at 
org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRowSize$2.apply(DataSetRel.scala:53)


at 
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)


at 
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)


at 
scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47)


at 
org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$class.estimateRowSize(DataSetRel.scala:53)


at 
org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.estimateRowSize(DataSetAggregate.scala:38)


at 
org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.computeSelfCost(DataSetAggregate.scala:80)


at 
org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)


at 
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown 
Source)


at 
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown 
Source)


at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:258)


at 
org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:1134)


at 
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:336)


at 
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:319)


at 
org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1838)


at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1774)


at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1038)


at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1058)


at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1950)


at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:137)


... 35 more

Am I missing something?

Thank you,

Davran.




--
Freundliche Grüße / Kind Regards

Timo Walther

Follow me: @twalthr
https://www.linkedin.com/in/twalthr



Re: Firing windows multiple times

2016-08-11 Thread Shannon Carey
"If Window B is a Folding Window and does not have an evictor then it should 
not keep the list of all received elements."

Agreed! Upon closer inspection, the behavior I'm describing is only present 
when using EvictingWindowOperator, not when using WindowOperator. I misread 
line 382 of WindowOperator which calls windowState.add(): in actuality, the 
windowState is a FoldingState which incorporates the user-provided fold 
function in order to eagerly fold the data. In contrast, if you use an evictor, 
EvictingWindowOperator has the behavior I describe.

I am already using a custom Trigger which uses a processing timer to FIRE a 
short time after a new event comes in, and an event timer to FIRE_AND_PURGE.

It seems that I can achieve the desired effect by avoiding use of an evictor so 
that the intermediate events are not retained in an EvictingWindowOperator's 
state, and perform any necessary eviction within my fold function. This has the 
aforementioned drawbacks of the windowed fold function not knowing about 
watermarks, and therefore it is difficult to be precise about choosing which 
items to evict. However, this seems to be the best choice within the current 
framework.

Interestingly, it appears that TimeEvictor doesn't really know about watermarks 
either. When a window emits an event, regardless of how it was fired, it is 
assigned the timestamp given by its window's maxTimestamp(), which might be 
much greater than the processing time that actually fired the event. Then, 
TimeEvictor compares the max timestamp of all items in the window against the 
other ones in order to determine which ones to evict. Basically, it assumes 
that the events were emitted due to the window terminating with FIRE_AND_PURGE. 
What if we gave more information (specifically, the current watermark) to the 
evictor in order to allow it to deal with a mix of intermediate events (fired 
by processing time) and final events (fired by event time when the watermark 
reaches the window)? That value is already available in the WindowOperator & 
could be passed to the Evictor very easily. It would be an API change, of 
course.

Other than that, is it worth considering a change to EvictingWindowOperator to 
allow user-supplied functions to reduce the size of its state when people fire 
upstream windows repeatedly? From what I see when I monitor the state with 
debugger print statements, the EvictingWindowOperator is definitely holding on 
to all the elements ever received, not just the aggregated result. You can see 
this clearly because EvictingWindowOperator holds a ListState instead of a 
FoldingState. The user-provided fold function is only applied upon fire().

-Shannon




Is java.sql.Timestamp fully suported in Flink SQL?

2016-08-11 Thread Davran Muzafarov
I have two tables created from data sets:

 

List infos0 = .

 

List infos1 = .

 

 

DataSet dataSet0 = env.fromCollection( infos0 );

 

DataSet dataSet1 = env.fromCollection( infos1 );

 

 

tableEnv.registerDataSet( "table0", dataSet0 );

tableEnv.registerDataSet( "table1", dataSet1 );

 

 

Table table = tableEnv.sql( "select * from table0 union select * from
table1" );

 

 

DataSet redyData = tableEnv.toDataSet( table, Row.class );

 

 

If "MarketDataInfo" have only String, Floats or Integers fields "toDataSet"
works. 

If MarketDataInfo has Timestamp, I am getting:

 

 

 

Internal error: Error occurred while applying rule DataSetAggregateRule

at org.apache.calcite.util.Util.newInternal(Util.java:792)

at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.
java:149)

at
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225)

at
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:118)

at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java
:214)

at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.ja
va:825)

at
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334)

at
org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnviron
ment.scala:253)

at
org.apache.flink.api.java.table.BatchTableEnvironment.toDataSet(BatchTableEn
vironment.scala:146)

...

Caused by: org.apache.flink.api.table.TableException: Unsupported data type
encountered

at
org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRo
wSize$2.apply(DataSetRel.scala:65)

at
org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRo
wSize$2.apply(DataSetRel.scala:53)

at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:5
1)

at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scal
a:60)

at
scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47)

at
org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$class.estimateRowSi
ze(DataSetRel.scala:53)

at
org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.estimateRowSi
ze(DataSetAggregate.scala:38)

at
org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.computeSelfCo
st(DataSetAggregate.scala:80)

at
org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulative
Cost(RelMdPercentageOriginalRows.java:162)

at
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
Source)

at
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
Source)

at
org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMet
adataQuery.java:258)

at
org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:1
134)

at
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubs
et.java:336)

at
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubse
t.java:319)

at
org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.ja
va:1838)

at
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.j
ava:1774)

at
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:
1038)

at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlann
er.java:1058)

at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlann
er.java:1950)

at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.
java:137)

... 35 more

 

 

Am I missing something?

 

Thank you,

Davran.

 



Unit tests failing, losing stream contents

2016-08-11 Thread Ciar, David B.
Hi everyone,


I've been trying to write unit tests for my data stream bolts (map, flatMap, 
apply etc.), however the results I've been getting are strange.  The code for 
testing is here (running with scalatest and sbt):


https://gist.github.com/dbciar/7469adfea9e6442cdc9568aed07095ff


It runs the stream process environment once for each check, and for one of the 
checks (output below) I get an "IllegalStateException: Factory has already been 
initialized" which I'm not sure of the cause, while for the rest I get an 
IndexOutOfboundsException.


The index exception is strange, as the index positions refer to the same number 
of input tuples to the stream, so it is as if some are being lost, or, the 
assert is running before the stream has completed processing and adding objects 
to rawStreamOutput: Iterator[RawObservation] object.


Any pointers on what might be happening would be appreciated.  Also if anyone 
has suggestions on how to incorporate the redis server in this check, as I had 
to comment out the server definition here and run it separately to get to the 
current position.


The two types of exception are first this:


[info] - Do well-formed observations parse OK? *** FAILED ***
[info]   java.lang.IllegalStateException: Factory has already been initialized
[info]   at 
org.apache.flink.core.memory.MemorySegmentFactory.initializeFactory(MemorySegmentFactory.java:132)
[info]   at 
org.apache.flink.runtime.taskmanager.TaskManager$.parseTaskManagerConfiguration(TaskManager.scala:2055)
[info]   at 
org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1802)
[info]   at 
org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.startTaskManager(LocalFlinkMiniCluster.scala:142)
[info]   at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:319)
[info]   at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:312)
[info]   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]   at scala.collection.immutable.Range.foreach(Range.scala:160)
[info]   at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)


The rest are as follows, with the index being the position in the 
rawStreamOutput Iterator[RawObservation] object I expect:


[info]   java.lang.IndexOutOfBoundsException: 3
[info]   at 
scala.collection.immutable.Vector.checkRangeConvert(Vector.scala:132)
[info]   at scala.collection.immutable.Vector.apply(Vector.scala:122)
[info]   at 
ProcessingBoltTest$$anonfun$5$$anon$7.(ProcessingBoltTest.scala:93)
[info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
[info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)




Thanks,

David



This message (and any attachments) is for the recipient only. NERC is subject 
to the Freedom of Information Act 2000 and the contents of this email and any 
reply you make may be disclosed by NERC unless it is exempt from release under 
the Act. Any material supplied to NERC may be stored in an electronic records 
management system.



Re: flink - Working with State example

2016-08-11 Thread Kostas Kloudas
Hi Buvana, 

At a first glance, your snapshotState() should return a Double.

Kostas

> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) 
>  wrote:
> 
> Thank you Kostas & Ufuk. I get into the following compilation error when I 
> use checkpointed interface. Pasting the code & message as follows:
> 
> Is the Serializable definition supposed to be from java.io.Serializable or 
> somewhere else?
> 
> Thanks again,
> Buvana
> 
> 
> Code:
> 
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> 
> import java.io.Serializable;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.apache.flink.util.Collector;
> 
> import java.util.Properties;
> 
> /**
> * Created by buvana on 8/9/16.
> */
> public class stateful {
>private static String INPUT_KAFKA_TOPIC = null;
> ---
> --- skipping the main as it’s the same as before except for class name change 
> -
> ---
>   public static class MapStateful extends RichFlatMapFunction Tuple2>
>implements Checkpointed {
> 
>private Double prev_tuple = null;
> 
>@Override
>public void flatMap(String incString, Collector Double>> out) {
>try {
>Double value = Double.parseDouble(incString);
>System.out.println("value = " + value);
>System.out.println(prev_tuple);
> 
>Double value2 = value - prev_tuple;
>prev_tuple = value;
> 
>Tuple2 tp = new Tuple2();
>tp.setField(INPUT_KAFKA_TOPIC, 0);
>tp.setField(value2, 1);
>out.collect(tp);
>} catch (NumberFormatException e) {
>System.out.println("Could not convert to Float" + incString);
>System.err.println("Could not convert to Float" + incString);
>}
>}
>@Override
>public void open(Configuration config) {
>if (prev_tuple == null) {
>// only recreate if null
>// restoreState will be called before open()
>// so this will already set the sum to the restored value
>prev_tuple = new Double("0.0");
>}
>}
> 
>@Override
>public Serializable snapshotState(
>long checkpointId,
>long checkpointTimestamp) throws Exception {
>return prev_tuple;
>}
> 
> 
>@Override
>public void restoreState(Double state) {
>prev_tuple = state;
>}
>}
> }
> ===
> ERROR message while building:
> 
> $ mvn clean package
> [INFO] Scanning for projects...
> [INFO]
>  
> [INFO] 
> 
> [INFO] Building Flink Quickstart Job 0.1
> [INFO] 
> 
> [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has been 
> relocated to commons-io:commons-io:jar:1.3.2
> [INFO] 
> [INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits ---
> [INFO] Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
> [INFO] 
> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ 
> wiki-edits ---
> [INFO] Using 'UTF-8' encoding to copy filtered resources.
> [INFO] Copying 1 resource
> [INFO] 
> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ wiki-edits 
> ---
> [INFO] Changes detected - recompiling the module!
> [INFO] Compiling 7 source files to 
> /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
> [INFO] -
> [ERROR] COMPILATION ERROR : 
> [INFO] -
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19]
>  

Re: Firing windows multiple times

2016-08-11 Thread Aljoscha Krettek
Hi Shannon,
thanks for the clarification. If Window B is a Folding Window and does not
have an evictor then it should not keep the list of all received elements.
Could you maybe post the section of the log that shows what window operator
is used for Window B? I'm looking for something like this:

08/11/2016 17:18:50 TriggerWindow(TumblingEventTimeWindows(4000),
FoldingStateDescriptor{serializer=null, initialValue=0,
foldFunction=org.apache.flink.streaming.examples.windowing.WindowWordCount$1@e73f9ac},
EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:436)) -> Sink:
Unnamed(6/8) switched to STARTING

This exactly shows what kind of operator with what kind of underlying
window state is being used.

Cheers,
Aljoscha


On Thu, 11 Aug 2016 at 14:27 Kostas Kloudas 
wrote:

> Just to add a drawback in solution 2) you may have some issues because
> window boundaries may not
> be aligned. For example the elements of a day window may be split between
> the last day of a month
> and the first of the next month.
>
> Kostas
>
> On Aug 11, 2016, at 2:21 PM, Kostas Kloudas 
> wrote:
>
> Hi Shanon,
>
> From what I understand, you want to have your results windowed by
> different different durations, e.g. by minute, by day,
> by month and you use the evictor to  decide which elements should go into
> each window. If I am correct, then I do not
> think that you need the evictor which bounds you to keep all the elements
> that the operator has seen (because it uses a listState).
>
> In this case you can do one of the following:
>
> 1) if you just want to have the big window (by month) and all the smaller
> ones to appear as early firings of the big one, then I would
> suggest you to go with a custom trigger. The trigger has access to
> watermarks, can register both event and processing time timers (so you can
> have firings whenever you want (per minute, per day, etc), can have state
> (e.g.element counter), and can decide to FIRE or FIRE_AND_PURGE.
>
> The only downside is that all intermediate firings will appear to belong
> to the big window. This means that the beginning and the end o the
> by-minute and daily firings will be those of the month that they belong to.
> If this is not a problem, I would go for that.
>
> 2) If the above is a problem, then what you can do, is key your input
> stream and then have 3 different windowing strategies, e.g. by minute, by
> day and by month. This way you will have also the desired window
> boundaries. This would look like:
>
> keyedStream.timeWindow(byMonth).addSink …
> keyedStream.timeWindow(byDay).addSink …
> keyedStream.timeWindow(byMinute).addSink …
>
> Please let us know if this answers your question and if you need any more
> help.
>
> Kostas
>
>
> On Aug 10, 2016, at 10:15 PM, Shannon Carey  wrote:
>
> Hi Aljoscha,
>
> Yes, I am using an Evictor, and I think I have seen the problem you are
> referring to. However, that's not what I'm talking about.
>
> If you re-read my first email, the main point is the following: if users
> desire updates more frequently than window watermarks are reached, then
> window state behaves suboptimally. It doesn't matter if there's an evictor
> or not. Specifically:
>
> If I have a windows "A" that I fire multiple times in order to provide
> incremental results as data comes in instead of waiting for the watermark
> to purge the window
> And that window's events are gathered into another, bigger window "B"
> And I want to keep only the latest event from each upstream window "A" (by
> timestamp, where each window pane has its own timestamp)
> Even if I have a fold/reduce method on the bigger window "B" to make sure
> that each updated event from "A" overwrites the previous event (by
> timestamp)
> Window "B" will hold in state all events from windows "A", including all
> the incremental events that were fired by processing-time triggers, even
> though I don't actually need those events because the reducer gets rid of
> them
>
> An example description of execution flow:
>
>1. Event x
>2. Window A receives event, trigger waits for processing time delay,
>then emits event x(time=1, count=1)
>3. Window B receives event, trigger waits for processing time delay,
>then executes fold() and emits event(time=1 => count=1), but internal
>Window state looks like *[x(time=1, count=1)]*
>4. Event y
>5. Window A receives event, trigger '', then emits event y(time=1,
>count=2)
>6. Window B receives event, trigger '', then executes fold() and emits
>event(time=1 => count=2), but internal Window state looks like *[x(time=1,
>count=1), y(time=1, count=2)]*
>7. Watermark z
>8. Window A receives watermark, trigger's event timer is reached,
>fires and purges and emits current state as event z(time=1, count=2)
>9. Window B receives event, trigger waits for processing time delay,
>then executes fold() and emits 

RE: flink - Working with State example

2016-08-11 Thread Ramanan, Buvana (Nokia - US)
Thank you Kostas & Ufuk. I get into the following compilation error when I use 
checkpointed interface. Pasting the code & message as follows:

Is the Serializable definition supposed to be from java.io.Serializable or 
somewhere else?

Thanks again,
Buvana


Code:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.functions.RichFlatMapFunction;

import java.io.Serializable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.util.Properties;

/**
 * Created by buvana on 8/9/16.
 */
public class stateful {
private static String INPUT_KAFKA_TOPIC = null;
---
--- skipping the main as it’s the same as before except for class name change 
-
---
public static class MapStateful extends RichFlatMapFunction>
implements Checkpointed {

private Double prev_tuple = null;

@Override
public void flatMap(String incString, Collector> 
out) {
try {
Double value = Double.parseDouble(incString);
System.out.println("value = " + value);
System.out.println(prev_tuple);

Double value2 = value - prev_tuple;
prev_tuple = value;

Tuple2 tp = new Tuple2();
tp.setField(INPUT_KAFKA_TOPIC, 0);
tp.setField(value2, 1);
out.collect(tp);
} catch (NumberFormatException e) {
System.out.println("Could not convert to Float" + incString);
System.err.println("Could not convert to Float" + incString);
}
}
@Override
public void open(Configuration config) {
if (prev_tuple == null) {
// only recreate if null
// restoreState will be called before open()
// so this will already set the sum to the restored value
prev_tuple = new Double("0.0");
}
}

@Override
public Serializable snapshotState(
long checkpointId,
long checkpointTimestamp) throws Exception {
return prev_tuple;
}


@Override
public void restoreState(Double state) {
prev_tuple = state;
}
}
}
===
ERROR message while building:

$ mvn clean package
[INFO] Scanning for projects...
[INFO] 
[INFO] 
[INFO] Building Flink Quickstart Job 0.1
[INFO] 
[WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has been 
relocated to commons-io:commons-io:jar:1.3.2
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits ---
[INFO] Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
[INFO] 
[INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ 
wiki-edits ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ wiki-edits ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 7 source files to 
/home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
[INFO] -
[ERROR] COMPILATION ERROR : 
[INFO] -
[ERROR] 
/home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19]
 wikiedits.stateful.MapStateful is not abstract and does not override abstract 
method snapshotState(long,long) in 
org.apache.flink.streaming.api.checkpoint.Checkpointed
[ERROR] 
/home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29]
 snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement 
snapshotState(long,long) in 

Re: flink - Working with State example

2016-08-11 Thread Kostas Kloudas
Exactly as Ufuk suggested, if you are not grouping your stream by key, 
you should use the checkpointed interface.

The reason I asked before if you are using the keyBy() is because this is the 
one that
implicitly sets the keySerializer and scopes your (keyed) state to a specific 
key.

If there is no keying, then keyed state cannot be used and the Checkpointed 
interface 
should be used instead. 

Let us know if you need anything else.

Kostas

> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi  wrote:
> 
> This only works for keyed streams, you have to use keyBy().
> 
> You can use the Checkpointed interface instead
> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
> 
> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
>  wrote:
>> Hi Kostas,
>> 
>> 
>> 
>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t]
>> is the current value of the incoming sample and x[t-1] is the previous value
>> of the incoming sample. I store the current value in state store
>> (‘prev_tuple’) so that I can use it for computation in next cycle. As you
>> may observe, I am not using keyBy. I am simply printing out the resultant
>> tuple.
>> 
>> 
>> 
>> It appears from the error message that I have to set the key serializer (and
>> possibly value serializer) for the state store. I am not sure how to do
>> that…
>> 
>> 
>> 
>> Thanks for your interest in helping,
>> 
>> 
>> 
>> 
>> 
>> Regards,
>> 
>> Buvana
>> 
>> 
>> 
>> public class stateful {
>> 
>>private static String INPUT_KAFKA_TOPIC = null;
>> 
>>private static int TIME_WINDOW = 0;
>> 
>> 
>> 
>>public static void main(String[] args) throws Exception {
>> 
>> 
>> 
>>if (args.length < 2) {
>> 
>>throw new IllegalArgumentException("The application needs two
>> arguments. The first is the name of the kafka topic from which it has to \n"
>> 
>>+ "fetch the data. The second argument is the size of
>> the window, in seconds, to which the aggregation function must be applied.
>> \n");
>> 
>>}
>> 
>> 
>> 
>>INPUT_KAFKA_TOPIC = args[0];
>> 
>>TIME_WINDOW = Integer.parseInt(args[1]);
>> 
>> 
>> 
>>Properties properties = null;
>> 
>> 
>> 
>>properties = new Properties();
>> 
>>properties.setProperty("bootstrap.servers", "localhost:9092");
>> 
>>properties.setProperty("zookeeper.connect", "localhost:2181");
>> 
>>properties.setProperty("group.id", "test");
>> 
>> 
>> 
>>StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>//env.setStateBackend(new
>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>> 
>> 
>> 
>>DataStreamSource stream = env
>> 
>>.addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new
>> SimpleStringSchema(), properties));
>> 
>> 
>> 
>>// maps the data into Flink tuples
>> 
>>DataStream> streamTuples = stream.flatMap(new
>> Rec2Tuple2());
>> 
>> 
>> 
>>// write the result to the console or in a Kafka topic
>> 
>>streamTuples.print();
>> 
>> 
>> 
>>env.execute("plus one");
>> 
>> 
>> 
>>}
>> 
>> 
>> 
>>public static class Rec2Tuple2 extends RichFlatMapFunction> Tuple2 > {
>> 
>>private transient ValueState> prev_tuple;
>> 
>> 
>> 
>>@Override
>> 
>>public void flatMap(String incString, Collector> Double>> out) throws Exception {
>> 
>>try {
>> 
>>Double value = Double.parseDouble(incString);
>> 
>>System.out.println("value = " + value);
>> 
>>Tuple2 prev_stored_tp = prev_tuple.value();
>> 
>>System.out.println(prev_stored_tp);
>> 
>> 
>> 
>>Double value2 = value - prev_stored_tp.f1;
>> 
>>prev_stored_tp.f1 = value;
>> 
>>prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>> 
>>prev_tuple.update(prev_stored_tp);
>> 
>> 
>> 
>>Tuple2 tp = new Tuple2();
>> 
>>tp.setField(INPUT_KAFKA_TOPIC, 0);
>> 
>>tp.setField(value2, 1);
>> 
>>out.collect(tp);
>> 
>> 
>> 
>>} catch (NumberFormatException e) {
>> 
>>System.out.println("Could not convert to Float" +
>> incString);
>> 
>>System.err.println("Could not convert to Float" +
>> incString);
>> 
>>}
>> 
>>}
>> 
>> 
>> 
>>@Override
>> 
>>public void open(Configuration config) {
>> 
>>ValueStateDescriptor> descriptor =
>> 
>>new ValueStateDescriptor<>(
>> 
>>"previous input value", // the state name
>> 
>>  

Re: flink - Working with State example

2016-08-11 Thread Ufuk Celebi
This only works for keyed streams, you have to use keyBy().

You can use the Checkpointed interface instead
(https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).

On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
 wrote:
> Hi Kostas,
>
>
>
> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t]
> is the current value of the incoming sample and x[t-1] is the previous value
> of the incoming sample. I store the current value in state store
> (‘prev_tuple’) so that I can use it for computation in next cycle. As you
> may observe, I am not using keyBy. I am simply printing out the resultant
> tuple.
>
>
>
> It appears from the error message that I have to set the key serializer (and
> possibly value serializer) for the state store. I am not sure how to do
> that…
>
>
>
> Thanks for your interest in helping,
>
>
>
>
>
> Regards,
>
> Buvana
>
>
>
> public class stateful {
>
> private static String INPUT_KAFKA_TOPIC = null;
>
> private static int TIME_WINDOW = 0;
>
>
>
> public static void main(String[] args) throws Exception {
>
>
>
> if (args.length < 2) {
>
> throw new IllegalArgumentException("The application needs two
> arguments. The first is the name of the kafka topic from which it has to \n"
>
> + "fetch the data. The second argument is the size of
> the window, in seconds, to which the aggregation function must be applied.
> \n");
>
> }
>
>
>
> INPUT_KAFKA_TOPIC = args[0];
>
> TIME_WINDOW = Integer.parseInt(args[1]);
>
>
>
> Properties properties = null;
>
>
>
> properties = new Properties();
>
> properties.setProperty("bootstrap.servers", "localhost:9092");
>
> properties.setProperty("zookeeper.connect", "localhost:2181");
>
> properties.setProperty("group.id", "test");
>
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> //env.setStateBackend(new
> FsStateBackend("file://home/buvana/flink/checkpoints"));
>
>
>
> DataStreamSource stream = env
>
> .addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new
> SimpleStringSchema(), properties));
>
>
>
> // maps the data into Flink tuples
>
> DataStream> streamTuples = stream.flatMap(new
> Rec2Tuple2());
>
>
>
> // write the result to the console or in a Kafka topic
>
> streamTuples.print();
>
>
>
> env.execute("plus one");
>
>
>
> }
>
>
>
> public static class Rec2Tuple2 extends RichFlatMapFunction Tuple2 > {
>
> private transient ValueState> prev_tuple;
>
>
>
> @Override
>
> public void flatMap(String incString, Collector Double>> out) throws Exception {
>
> try {
>
> Double value = Double.parseDouble(incString);
>
> System.out.println("value = " + value);
>
> Tuple2 prev_stored_tp = prev_tuple.value();
>
> System.out.println(prev_stored_tp);
>
>
>
> Double value2 = value - prev_stored_tp.f1;
>
> prev_stored_tp.f1 = value;
>
> prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>
> prev_tuple.update(prev_stored_tp);
>
>
>
> Tuple2 tp = new Tuple2();
>
> tp.setField(INPUT_KAFKA_TOPIC, 0);
>
> tp.setField(value2, 1);
>
> out.collect(tp);
>
>
>
> } catch (NumberFormatException e) {
>
> System.out.println("Could not convert to Float" +
> incString);
>
> System.err.println("Could not convert to Float" +
> incString);
>
> }
>
> }
>
>
>
> @Override
>
> public void open(Configuration config) {
>
> ValueStateDescriptor> descriptor =
>
> new ValueStateDescriptor<>(
>
> "previous input value", // the state name
>
> TypeInformation.of(new TypeHint Double>>() {}), // type information
>
> Tuple2.of("test topic", 0.0)); // default value
> of the state, if nothing was set
>
> prev_tuple = getRuntimeContext().getState(descriptor);
>
> }
>
> }
>
> }
>
>
>
> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com]
> Sent: Thursday, August 11, 2016 5:45 AM
> To: user@flink.apache.org
> Subject: Re: flink - Working with State example
>
>
>
> Hello Buvana,
>
>
>
> Can you share a bit more details on your operator and how you are using it?
>
> For example, are you using keyBy before using you custom operator?
>
>
>
> Thanks a lot,
>
> Kostas
>
>
>
> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia 

Re: specify user name when connecting to hdfs

2016-08-11 Thread Ufuk Celebi
Do you also set fs.hdfs.hadoopconf in flink-conf.yaml
(https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#common-options)?

On Thu, Aug 11, 2016 at 2:47 PM, Dong-iL, Kim  wrote:
> Hi.
> In this case , I used standalone cluster(aws EC2) and I wanna connect to
> remote HDFS machine(aws EMR).
> I register the location of core-site.xml as below.
> does it need other properties?
>
> 
> 
> fs.defaultFS
> hdfs://…:8020
> 
> 
> hadoop.security.authentication
> simple
> 
> 
> hadoop.security.key.provider.path
> kms://:9700/kms
> 
> 
> hadoop.job.ugi
> hadoop
> 
>
> Thanks.
>
> On Aug 11, 2016, at 9:31 PM, Stephan Ewen  wrote:
>
> Hi!
>
> Do you register the Hadoop Config at the Flink Configuration?
> Also, do you use Flink standalone or on Yarn?
>
> Stephan
>
> On Tue, Aug 9, 2016 at 11:00 AM, Dong-iL, Kim  wrote:
>>
>> Hi.
>> I’m trying to set external hdfs as state backend.
>> my os user name is ec2-user. hdfs user is hadoop.
>> there is a permission denied exception.
>> I wanna specify hdfs user name.
>> I set hadoop.job.ugi in core-site.xml and HADOOP_USER_NAME on command
>> line.
>> but not works.
>> what shall I do?
>> thanks.
>
>
>


RE: flink - Working with State example

2016-08-11 Thread Ramanan, Buvana (Nokia - US)
Hi Kostas,

Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t] is 
the current value of the incoming sample and x[t-1] is the previous value of 
the incoming sample. I store the current value in state store (‘prev_tuple’) so 
that I can use it for computation in next cycle. As you may observe, I am not 
using keyBy. I am simply printing out the resultant tuple.

It appears from the error message that I have to set the key serializer (and 
possibly value serializer) for the state store. I am not sure how to do that…

Thanks for your interest in helping,


Regards,
Buvana

public class stateful {
private static String INPUT_KAFKA_TOPIC = null;
private static int TIME_WINDOW = 0;

public static void main(String[] args) throws Exception {

if (args.length < 2) {
throw new IllegalArgumentException("The application needs two 
arguments. The first is the name of the kafka topic from which it has to \n"
+ "fetch the data. The second argument is the size of the 
window, in seconds, to which the aggregation function must be applied. \n");
}

INPUT_KAFKA_TOPIC = args[0];
TIME_WINDOW = Integer.parseInt(args[1]);

Properties properties = null;

properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStateBackend(new 
FsStateBackend("file://home/buvana/flink/checkpoints"));

DataStreamSource stream = env
.addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new 
SimpleStringSchema(), properties));

// maps the data into Flink tuples
DataStream> streamTuples = stream.flatMap(new 
Rec2Tuple2());

// write the result to the console or in a Kafka topic
streamTuples.print();

env.execute("plus one");

}

public static class Rec2Tuple2 extends RichFlatMapFunction > {
private transient ValueState> prev_tuple;

@Override
public void flatMap(String incString, Collector> 
out) throws Exception {
try {
Double value = Double.parseDouble(incString);
System.out.println("value = " + value);
Tuple2 prev_stored_tp = prev_tuple.value();
System.out.println(prev_stored_tp);

Double value2 = value - prev_stored_tp.f1;
prev_stored_tp.f1 = value;
prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
prev_tuple.update(prev_stored_tp);

Tuple2 tp = new Tuple2();
tp.setField(INPUT_KAFKA_TOPIC, 0);
tp.setField(value2, 1);
out.collect(tp);

} catch (NumberFormatException e) {
System.out.println("Could not convert to Float" + incString);
System.err.println("Could not convert to Float" + incString);
}
}

@Override
public void open(Configuration config) {
ValueStateDescriptor> descriptor =
new ValueStateDescriptor<>(
"previous input value", // the state name
TypeInformation.of(new TypeHint>() {}), // type information
Tuple2.of("test topic", 0.0)); // default value of 
the state, if nothing was set
prev_tuple = getRuntimeContext().getState(descriptor);
}
}
}

From: Kostas Kloudas [mailto:k.klou...@data-artisans.com]
Sent: Thursday, August 11, 2016 5:45 AM
To: user@flink.apache.org
Subject: Re: flink - Working with State example

Hello Buvana,

Can you share a bit more details on your operator and how you are using it?
For example, are you using keyBy before using you custom operator?

Thanks a lot,
Kostas

On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) 
> 
wrote:

Hello,

I am utilizing the code snippet in: 
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html
 and particularly ‘open’ function in my code:
@Override
public void open(Configuration config) {
ValueStateDescriptor> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint>() 
{}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if 
nothing was set
sum = 

Re: Strange behaviour of the flatMap Collector

2016-08-11 Thread Yassin Marzouki
Indeed, using the same parallelism corrected the output. Thank you!

On Thu, Aug 11, 2016 at 2:34 PM, Stephan Ewen  wrote:

> Hi!
>
> The source runs parallel (n tasks), but the sink has a parallelism of 1.
> The sink hence has to merge the parallel streams from the source, which
> happens based on arrival speed of the streams, i.e., its not deterministic.
> That's why you see the lines being mixed.
>
> Try running source and sink with the same parallelism, then no merge of
> streams needs to happen. You'll see then that per output file, the lines
> are correct.
>
> Stephan
>
>
> On Thu, Aug 11, 2016 at 2:29 PM, Yassin Marzouki 
> wrote:
>
>> Hi all,
>>
>> When I use out.collect() twice inside a faltMap, the output is sometimes
>> and randomly skewed. Take this example:
>>
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.cre
>> ateLocalEnvironment();
>> env.generateSequence(1, 10)
>> .flatMap((Long t, Collector out) -> {
>> out.collect("line1");
>> out.collect("line2");
>> })
>> .writeAsText("test",FileSystem.WriteMode.OVERWRITE).
>> setParallelism(1);
>> env.execute("Test");
>>
>> I expect the output to be
>> line1
>> line2
>> line1
>> line2
>> ...
>>
>> But some resulting lines (18 out of 20) were:
>> line2
>> line2
>> and the same for line1.
>>
>> What could be the reason for this?
>>
>> Best,
>> Yassine
>>
>
>


Re: specify user name when connecting to hdfs

2016-08-11 Thread Dong-iL, Kim
Hi.
In this case , I used standalone cluster(aws EC2) and I wanna connect to remote 
HDFS machine(aws EMR).
I register the location of core-site.xml as below.
does it need other properties?



fs.defaultFS
hdfs://…:8020


hadoop.security.authentication
simple


hadoop.security.key.provider.path
kms://:9700/kms


hadoop.job.ugi
hadoop


Thanks.

> On Aug 11, 2016, at 9:31 PM, Stephan Ewen  wrote:
> 
> Hi!
> 
> Do you register the Hadoop Config at the Flink Configuration?
> Also, do you use Flink standalone or on Yarn?
> 
> Stephan
> 
> On Tue, Aug 9, 2016 at 11:00 AM, Dong-iL, Kim  > wrote:
> Hi.
> I’m trying to set external hdfs as state backend.
> my os user name is ec2-user. hdfs user is hadoop.
> there is a permission denied exception.
> I wanna specify hdfs user name.
> I set hadoop.job.ugi in core-site.xml and HADOOP_USER_NAME on command line.
> but not works.
> what shall I do?
> thanks.
> 



Re: Strange behaviour of the flatMap Collector

2016-08-11 Thread Stephan Ewen
Hi!

The source runs parallel (n tasks), but the sink has a parallelism of 1.
The sink hence has to merge the parallel streams from the source, which
happens based on arrival speed of the streams, i.e., its not deterministic.
That's why you see the lines being mixed.

Try running source and sink with the same parallelism, then no merge of
streams needs to happen. You'll see then that per output file, the lines
are correct.

Stephan


On Thu, Aug 11, 2016 at 2:29 PM, Yassin Marzouki 
wrote:

> Hi all,
>
> When I use out.collect() twice inside a faltMap, the output is sometimes
> and randomly skewed. Take this example:
>
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> createLocalEnvironment();
> env.generateSequence(1, 10)
> .flatMap((Long t, Collector out) -> {
> out.collect("line1");
> out.collect("line2");
> })
> .writeAsText("test",FileSystem.WriteMode.
> OVERWRITE).setParallelism(1);
> env.execute("Test");
>
> I expect the output to be
> line1
> line2
> line1
> line2
> ...
>
> But some resulting lines (18 out of 20) were:
> line2
> line2
> and the same for line1.
>
> What could be the reason for this?
>
> Best,
> Yassine
>


Re: specify user name when connecting to hdfs

2016-08-11 Thread Stephan Ewen
Hi!

Do you register the Hadoop Config at the Flink Configuration?
Also, do you use Flink standalone or on Yarn?

Stephan

On Tue, Aug 9, 2016 at 11:00 AM, Dong-iL, Kim  wrote:

> Hi.
> I’m trying to set external hdfs as state backend.
> my os user name is ec2-user. hdfs user is hadoop.
> there is a permission denied exception.
> I wanna specify hdfs user name.
> I set hadoop.job.ugi in core-site.xml and HADOOP_USER_NAME on command line.
> but not works.
> what shall I do?
> thanks.


Strange behaviour of the flatMap Collector

2016-08-11 Thread Yassin Marzouki
Hi all,

When I use out.collect() twice inside a faltMap, the output is sometimes
and randomly skewed. Take this example:

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.generateSequence(1, 10)
.flatMap((Long t, Collector out) -> {
out.collect("line1");
out.collect("line2");
})

.writeAsText("test",FileSystem.WriteMode.OVERWRITE).setParallelism(1);
env.execute("Test");

I expect the output to be
line1
line2
line1
line2
...

But some resulting lines (18 out of 20) were:
line2
line2
and the same for line1.

What could be the reason for this?

Best,
Yassine


Re: Maintaining global variables. Best practices.

2016-08-11 Thread Stephan Ewen
Hi!

A global shared variable is not something that is offered by Flink right
now. It is not part of the system, because it is not really part of the
stream or state derived from individual streams. It is also quite hard to
do efficiently and general purpose.

I see that it is a useful tool in several use cases. You would currently
have to add that yourself, you can try and use something like Redis to hold
shared state, or you can use an embedded Akka and CRDTs to have replicated
state that syncs up across all nodes.

Greetings,
Stephan



On Wed, Aug 10, 2016 at 6:18 PM, Dima Arbuzin  wrote:

> Hi there,
>
> I'm working on a clustering algorithm on a DataStream based on this paper
>  and in order to
> implement that, I need to maintain some global values (initially HashMap
> but might be DataSet) and update it on every item arriving.
>
> If this is even possible with Flink? (some kind of reverse broadcasting)
> And if yes, what would be the right way to do it? or what are the
> alternatives?
>
> As a stream simulation I use source function based on the demo
> 
> .
>
> Thanks,
>
>
> --
> Best regards,
> Dima Arbuzin
>


Re: Firing windows multiple times

2016-08-11 Thread Kostas Kloudas
Just to add a drawback in solution 2) you may have some issues because window 
boundaries may not 
be aligned. For example the elements of a day window may be split between the 
last day of a month 
and the first of the next month.

Kostas

> On Aug 11, 2016, at 2:21 PM, Kostas Kloudas  
> wrote:
> 
> Hi Shanon,
> 
> From what I understand, you want to have your results windowed by different 
> different durations, e.g. by minute, by day,
> by month and you use the evictor to  decide which elements should go into 
> each window. If I am correct, then I do not 
> think that you need the evictor which bounds you to keep all the elements 
> that the operator has seen (because it uses a listState).
> 
> In this case you can do one of the following:
> 
> 1) if you just want to have the big window (by month) and all the smaller 
> ones to appear as early firings of the big one, then I would 
> suggest you to go with a custom trigger. The trigger has access to 
> watermarks, can register both event and processing time timers (so you can 
> have firings whenever you want (per minute, per day, etc), can have state 
> (e.g.element counter), and can decide to FIRE or FIRE_AND_PURGE.
> 
> The only downside is that all intermediate firings will appear to belong to 
> the big window. This means that the beginning and the end o the by-minute and 
> daily firings will be those of the month that they belong to. If this is not 
> a problem, I would go for that.
> 
> 2) If the above is a problem, then what you can do, is key your input stream 
> and then have 3 different windowing strategies, e.g. by minute, by day and by 
> month. This way you will have also the desired window boundaries. This would 
> look like:
> 
> keyedStream.timeWindow(byMonth).addSink …
> keyedStream.timeWindow(byDay).addSink …
> keyedStream.timeWindow(byMinute).addSink …
> 
> Please let us know if this answers your question and if you need any more 
> help.
> 
> Kostas
>  
>> On Aug 10, 2016, at 10:15 PM, Shannon Carey > > wrote:
>> 
>> Hi Aljoscha,
>> 
>> Yes, I am using an Evictor, and I think I have seen the problem you are 
>> referring to. However, that's not what I'm talking about.
>> 
>> If you re-read my first email, the main point is the following: if users 
>> desire updates more frequently than window watermarks are reached, then 
>> window state behaves suboptimally. It doesn't matter if there's an evictor 
>> or not. Specifically:
>> 
>> If I have a windows "A" that I fire multiple times in order to provide 
>> incremental results as data comes in instead of waiting for the watermark to 
>> purge the window
>> And that window's events are gathered into another, bigger window "B"
>> And I want to keep only the latest event from each upstream window "A" (by 
>> timestamp, where each window pane has its own timestamp)
>> Even if I have a fold/reduce method on the bigger window "B" to make sure 
>> that each updated event from "A" overwrites the previous event (by timestamp)
>> Window "B" will hold in state all events from windows "A", including all the 
>> incremental events that were fired by processing-time triggers, even though 
>> I don't actually need those events because the reducer gets rid of them
>> 
>> An example description of execution flow:
>> Event x
>> Window A receives event, trigger waits for processing time delay, then emits 
>> event x(time=1, count=1)
>> Window B receives event, trigger waits for processing time delay, then 
>> executes fold() and emits event(time=1 => count=1), but internal Window 
>> state looks like [x(time=1, count=1)]
>> Event y
>> Window A receives event, trigger '', then emits event y(time=1, count=2)
>> Window B receives event, trigger '', then executes fold() and emits 
>> event(time=1 => count=2), but internal Window state looks like [x(time=1, 
>> count=1), y(time=1, count=2)]
>> Watermark z
>> Window A receives watermark, trigger's event timer is reached, fires and 
>> purges and emits current state as event z(time=1, count=2)
>> Window B receives event, trigger waits for processing time delay, then 
>> executes fold() and emits event(time=1 => count=2), but internal Window 
>> state looks like [x(time=1, count=1), y(time=1, count=2), z(time=1, count=2)]
>> As you can see, the internal window state continues to grow despite what 
>> fold() is doing.
>> 
>> Does that explanation help interpret my original email?
>> 
>> -Shannon
>> 
>> 
>> From: Aljoscha Krettek >
>> Date: Wednesday, August 10, 2016 at 12:18 PM
>> To: "user@flink.apache.org " 
>> >
>> Subject: Re: Firing windows multiple times
>> 
>> Hi,
>> from your mail I'm gathering that you are in fact using an Evictor, is that 
>> correct? If not, then the window operator should not keep all the elements 
>> ever received for a 

Re: Firing windows multiple times

2016-08-11 Thread Kostas Kloudas
Hi Shanon,

From what I understand, you want to have your results windowed by different 
different durations, e.g. by minute, by day,
by month and you use the evictor to  decide which elements should go into each 
window. If I am correct, then I do not 
think that you need the evictor which bounds you to keep all the elements that 
the operator has seen (because it uses a listState).

In this case you can do one of the following:

1) if you just want to have the big window (by month) and all the smaller ones 
to appear as early firings of the big one, then I would 
suggest you to go with a custom trigger. The trigger has access to watermarks, 
can register both event and processing time timers (so you can have firings 
whenever you want (per minute, per day, etc), can have state (e.g.element 
counter), and can decide to FIRE or FIRE_AND_PURGE.

The only downside is that all intermediate firings will appear to belong to the 
big window. This means that the beginning and the end o the by-minute and daily 
firings will be those of the month that they belong to. If this is not a 
problem, I would go for that.

2) If the above is a problem, then what you can do, is key your input stream 
and then have 3 different windowing strategies, e.g. by minute, by day and by 
month. This way you will have also the desired window boundaries. This would 
look like:

keyedStream.timeWindow(byMonth).addSink …
keyedStream.timeWindow(byDay).addSink …
keyedStream.timeWindow(byMinute).addSink …

Please let us know if this answers your question and if you need any more help.

Kostas
 
> On Aug 10, 2016, at 10:15 PM, Shannon Carey  wrote:
> 
> Hi Aljoscha,
> 
> Yes, I am using an Evictor, and I think I have seen the problem you are 
> referring to. However, that's not what I'm talking about.
> 
> If you re-read my first email, the main point is the following: if users 
> desire updates more frequently than window watermarks are reached, then 
> window state behaves suboptimally. It doesn't matter if there's an evictor or 
> not. Specifically:
> 
> If I have a windows "A" that I fire multiple times in order to provide 
> incremental results as data comes in instead of waiting for the watermark to 
> purge the window
> And that window's events are gathered into another, bigger window "B"
> And I want to keep only the latest event from each upstream window "A" (by 
> timestamp, where each window pane has its own timestamp)
> Even if I have a fold/reduce method on the bigger window "B" to make sure 
> that each updated event from "A" overwrites the previous event (by timestamp)
> Window "B" will hold in state all events from windows "A", including all the 
> incremental events that were fired by processing-time triggers, even though I 
> don't actually need those events because the reducer gets rid of them
> 
> An example description of execution flow:
> Event x
> Window A receives event, trigger waits for processing time delay, then emits 
> event x(time=1, count=1)
> Window B receives event, trigger waits for processing time delay, then 
> executes fold() and emits event(time=1 => count=1), but internal Window state 
> looks like [x(time=1, count=1)]
> Event y
> Window A receives event, trigger '', then emits event y(time=1, count=2)
> Window B receives event, trigger '', then executes fold() and emits 
> event(time=1 => count=2), but internal Window state looks like [x(time=1, 
> count=1), y(time=1, count=2)]
> Watermark z
> Window A receives watermark, trigger's event timer is reached, fires and 
> purges and emits current state as event z(time=1, count=2)
> Window B receives event, trigger waits for processing time delay, then 
> executes fold() and emits event(time=1 => count=2), but internal Window state 
> looks like [x(time=1, count=1), y(time=1, count=2), z(time=1, count=2)]
> As you can see, the internal window state continues to grow despite what 
> fold() is doing.
> 
> Does that explanation help interpret my original email?
> 
> -Shannon
> 
> 
> From: Aljoscha Krettek >
> Date: Wednesday, August 10, 2016 at 12:18 PM
> To: "user@flink.apache.org " 
> >
> Subject: Re: Firing windows multiple times
> 
> Hi,
> from your mail I'm gathering that you are in fact using an Evictor, is that 
> correct? If not, then the window operator should not keep all the elements 
> ever received for a window but only the aggregated result.
> 
> Side note, there seems to be a bug in EvictingWindowOperator that causes 
> evicted elements to not actually be removed from the state. They are only 
> filtered from the Iterable that is given to the WindowFunction. I opened a 
> Jira issue for that: https://issues.apache.org/jira/browse/FLINK-4369 
> 
> 
> Cheers,
> Aljoscha
> 
> On Wed, 10 Aug 2016 at 18:19 Shannon Carey  

Re: Flink : CEP processing

2016-08-11 Thread Aljoscha Krettek
Hi,
Sameet is right about the snapshotting. The CEP operator behaves more or
less like a FlatMap operator that keeps some more complex state internally.
Snapshotting works the same as with any other operator.

Cheers,
Aljoscha

On Thu, 11 Aug 2016 at 00:54 Sameer W  wrote:

> Mans,
>
> I think at this time we need someone who knows the internal implementation
> to answer definitively-
>
> My understanding is-
>
> 1. Internally CEP is like a map operator with session-like semantics
> operating in a pipeline. You could do what it does but you would have to
> implement all that. If you need support for negation today that is probably
> how you would do it.
> 2. Ultimately CEP produces a stream which you need to write to some sink.
> If you sink supports exactly-once semantics then your pipeline will support
> it. So I think snapshotting with CEP would be no different. If you send out
> events (alerts) from within your select(PatternSelectFunction) then yes,
> you could "send" your alerts multiple times. If instead you wrote to a sink
> (with exactly once semantics) which then sent alerts out in the real world
> you should not get those multiple alerts. I am sending alerts from within
> my PatternSelectFunction. So I am taking the chance of sending alerts twice
> which is ok for my use-case.
>
> I am operating under the belief (which seems logical to me) that CEP is
> like a stateful map operator at end of my processing pipeline. Snapshotting
> would work exactly like it would in that case in CEP.
>
> Thanks,
> Sameer
>
>
> On Wed, Aug 10, 2016 at 2:42 PM, M Singh  wrote:
>
>> Thanks for the pointers Sameer.
>>
>>
>> The reason I wanted to find out about snapshotting with CEP is because I
>> thought that CEP state might also be snapshotted for recovery. If that is
>> the case, then there are events in the CEP might be in two snapshots.
>>
>> Mans
>>
>>
>> On Tuesday, August 9, 2016 1:15 PM, Sameer W  wrote:
>>
>>
>> In one of the earlier thread Till explained this to me (
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CEP-and-Within-Clause-td8159.html
>> )
>>
>> 1. Within does not use time windows. It sort of uses session windows
>> where the session begins when the first event of the pattern is identified.
>> The timer starts when the "first" event in the pattern fires. If the
>> pattern completes "within" the designated times (meaning the "next" and
>> "followed by" fire as will "within" the time specified) you have a match or
>> else the window is removed. I don't know how it is implemented but I doubt
>> it stores all the events in memory for the "within" window (there is not
>> need to). It will only store the relevant events (first, next, followed by,
>> etc). So memory would not be an issue here. If two "first" type events are
>> identified I think two "within" sessions are created.
>>
>> 2. Snapshotting (I don't know much in this area so I cannot answer). Why
>> should it be different though? You are using operators and state. It should
>> work the same way. But I am not too familiar with that.
>>
>> 3. The "Within" window is not an issue. Even the window preceding that
>> should not be unless you are using WindowFunction (more memory friendly
>> alternative is https://ci.apache.org/ projects/flink/flink-docs-
>> master/apis/streaming/windows. html#window-functions
>> 
>>  ) by
>> themselves and using a really large window
>>
>> 4. The way I am using it, it is working fine. Some of the limitations I
>> have seen are related to this paper not being fully implemented 
>> (https://people.cs.umass.edu/
>> ~yanlei/publications/sase- sigmod08.pdf
>> ). I
>> don't know how to support negation in an event stream but I don't need it
>> for now.
>>
>> Thanks,
>> Sameer
>>
>>
>> On Tue, Aug 9, 2016 at 3:45 PM, M Singh  wrote:
>>
>> Hi Sameer:
>>
>> If we use a within window for event series -
>>
>> 1. Does it interfere with the default time windows ?
>> 2. How does it affect snapshotting ?
>> 3. If the window is too large are the events stored in a "processor" for
>> the window to expire ?
>> 4. Are there any other know limitations and best practices of using CEP
>> with Flink ?
>>
>> Thanks again for your help.
>>
>>
>>
>> On Tuesday, August 9, 2016 11:29 AM, Sameer Wadkar 
>> wrote:
>>
>>
>> In that case you need to get them into one stream somehow (keyBy a dummy
>> value for example). There is always some logical key to keyBy on when data
>> is arriving from multiple sources (ex some portion of the time stamp).
>>
>> You are looking for patterns within something (events happening around
>> the same time but arriving from multiple devices). That something should be
>> the key. That's how I am using it.
>>
>> Sameer

Re: ValueState is missing

2016-08-11 Thread Dong-iL, Kim

my code and log is as below.


val getExecuteEnv: StreamExecutionEnvironment = {
val env = 
StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
env.getCheckpointConfig.setCheckpointTimeout(6)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 3))
env
}

def transform(target: DataStream[(String, String, String, String, Long)]): 
DataStream[WinLossBase] =
target.keyBy(_._3).flatMap(new StateOperator)

def main(args: Array[String]) {
val env = getExecuteEnv
val source: DataStream[String] = 
extractFromKafka(env).name("KafkaSource")
val json = deserializeToJsonObj(source).name("ConvertToJson")
val target: DataStream[(String, String, String, String, Long)] = 
preTransform(json)
val result: DataStream[WinLossBase] = 
transform(target).name("ToKeyedStream”)
…
}

class StateOperator extends RichFlatMapFunction[(String, String, String, 
String, Long), WinLossBase] {
var playerState: ValueState[util.Map[String, PotPlayer]] = _
var handState: ValueState[HandHistoryInfo] = _

override def open(param: Configuration): Unit = {
val playerValueStateDescriptor = new 
ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, 
PotPlayer]())
playerState = getRuntimeContext.getState(playerValueStateDescriptor)
handState = getRuntimeContext.getState(new 
ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
}

override def flatMap(in: (String, String, String, String, Long), out: 
Collector[WinLossBase]): Unit = {
in._2 match {
case "GameStartHistory" =>
val players = playerState.value()
val obj = _convertJsonToRecord(in._4, 
classOf[GameStartHistoryRecord])
val record = obj.asInstanceOf[GameStartHistoryRecord]
val handHistoryInfo: HandHistoryInfo = 
_setUpHandHistoryInfo(record)
if (LOG.isInfoEnabled())
LOG.info("hand start {}", if (handHistoryInfo != null) 
handHistoryInfo.handHistoryId else "NULL”)
   ….
playerState.update(players)
handState.update(handHistoryInfo)
case "HoleCardHistory" =>
val players = playerState.value()
if (players != null) {
...
 playerState.update(players)
} else LOG.warn("there is no player[hole card]. {}", in._4)
case "PlayerStateHistory" =>
val players = playerState.value()
if (players != null) {
….
playerState.update(players)
} else LOG.warn("there is no player[player state]. {}", 
in._4)
case "CommCardHistory" =>
val handHistoryInfo = handState.value()
val commCardHistory: CommCardHistory = commCardState.value()
if (handHistoryInfo != null) {
...
handState.update(handHistoryInfo)
commCardState.update(commCardHistory)
} else LOG.warn("there is no handhistory info[comm card]. 
{}", in._4)
case "PlayerActionHistory" =>
val handHistoryInfo = handState.value()
val players = playerState.value()

if (handHistoryInfo != null) {
...
} else LOG.warn("there is no handhistory info[player 
action]. {}", in._4)
case "PotHistory" =>
val players = playerState.value()
val handHistoryInfo = handState.value()
val commCardHistory: CommCardHistory = commCardState.value()
if (handHistoryInfo != null && handHistoryInfo.playType == 
PlayType.Cash && players != null && players.size > 1) {
...
} else LOG.warn("there is no handhistory info[pot]. {}", 
in._4)
case "GameEndHistory" =>
val players = playerState.value()
val handHistoryInfo = handState.value()
...
if (LOG.isTraceEnabled()) LOG.trace("end {}", 
record.getHandHistoryId)
playerState.clear()
handState.clear()
case _ =>
}
}

—— log —— 
2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, 

Re: ValueState is missing

2016-08-11 Thread Ufuk Celebi
What do you mean with lost exactly?

You call value() and it returns a value (!= null/defaultValue) and you
call it again and it returns null/defaultValue for the same key with
no update in between?

On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
 wrote:
> Hello,
>
> Could you share the code of the job you are running?
> With only this information I am afraid we cannot help much.
>
> Thanks,
> Kostas
>
>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim  wrote:
>>
>> Hi.
>> I’m using flink 1.0.3 on aws EMR.
>> sporadically value of ValueState is lost.
>> what is starting point for solving this problem.
>> Thank you.
>


Re: ValueState is missing

2016-08-11 Thread Kostas Kloudas
Hello,

Could you share the code of the job you are running?
With only this information I am afraid we cannot help much.

Thanks,
Kostas

> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim  wrote:
> 
> Hi.
> I’m using flink 1.0.3 on aws EMR.
> sporadically value of ValueState is lost.
> what is starting point for solving this problem.
> Thank you.



ValueState is missing

2016-08-11 Thread Dong-iL, Kim
Hi.
I’m using flink 1.0.3 on aws EMR.
sporadically value of ValueState is lost.
what is starting point for solving this problem.
Thank you.

[ANNOUNCE] Flink 1.1.1 Released

2016-08-11 Thread Ufuk Celebi
The Flink PMC is pleased to announce the availability of Flink 1.1.1.

The Maven artifacts published on Maven central for the previous 1.1.0
version had a Hadoop dependency issue. No Hadoop 1 specific version
(with version 1.1.0-hadoop1) was deployed and the 1.1.0 artifacts have
a dependency on Hadoop 1 instead of Hadoop 2.

This was fixed with this version and we highly recommend to update all
Flink dependencies to version '1.1.1'. We are sorry for the
inconvenience this might have caused.

The release announcement:
http://flink.apache.org/news/2016/08/11/release-1.1.1.html

Release binaries:
http://www-us.apache.org/dist/flink/flink-1.1.1/