Re: Parallelism, registerEventTimeTimer and watermark problem

2017-10-17 Thread Fritz Budiyanto
Sorry, missing copy paste for the exception thrown:

10/17/2017 20:21:30 dropDetection -> (aggFlowDropDetectPrintln -> Sink: 
Unnamed, aggFlowDropDetectPrintln -> Sink: Unnamed, Sink: kafkaSink)(3/4) 
switched to CANCELED 
20:21:30,244 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Job Aggregate flows (313a46d5fd23e4c2d0d00d0033950b6d) switched from state 
FAILING to FAILED.
java.lang.NullPointerException: Keyed state can only be used on a 'keyed 
stream', i.e., after a 'keyBy()' operation.
at 
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:151)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:115)
at 
FlowContractStitcherProcess.endState$lzycompute(FlowContractResolver.scala:30)
at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
at 
FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
at 
FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)


--
Fritz


> On Oct 17, 2017, at 7:55 PM, Fritz Budiyanto  wrote:
> 
> Hi All,
> 
> If I have high parallelism and use processFunction to registerEventTimeTimer, 
> the timer never gets fired.
> After debugging, I found out the watermark isn't updated because I have keyBy 
> right after assignTimestampsAndWatermarks.
> And if I set assignTimestampsAndWatermarks right after the keyBy, an 
> exception is thrown.
> 
> val contractFlow = enrichedFlow
>  .keyBy(f => f.fiveTupleKey)
>  .assignTimestampsAndWatermarks(new AggFlowTimestampAssigner) <
>  .process(new FlowContractStitcherProcess)
>  .name("contractStitcher")
> 
> at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
>   at 
> FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
>   at 
> FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
>   at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>   at 
> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> 
> 
> Any idea how to solve my problem ? How do I update the watermark after keyBy ?
> 
> Would I hit scaling issue if on large number of timer if I use 
> 

Parallelism, registerEventTimeTimer and watermark problem

2017-10-17 Thread Fritz Budiyanto
Hi All,

If I have high parallelism and use processFunction to registerEventTimeTimer, 
the timer never gets fired.
After debugging, I found out the watermark isn't updated because I have keyBy 
right after assignTimestampsAndWatermarks.
And if I set assignTimestampsAndWatermarks right after the keyBy, an exception 
is thrown.

 val contractFlow = enrichedFlow
  .keyBy(f => f.fiveTupleKey)
  .assignTimestampsAndWatermarks(new AggFlowTimestampAssigner) <
  .process(new FlowContractStitcherProcess)
  .name("contractStitcher")

at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
at 
FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
at 
FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)


Any idea how to solve my problem ? How do I update the watermark after keyBy ?

Would I hit scaling issue if on large number of timer if I use 
registerProcessingTimeTimer instead ? I'm using event time throughout the 
pipeline, would mixing processing timer with event time might cause problem 
down the line ?

--
Fritz

Re: Task Manager was lost/killed due to full GC

2017-10-17 Thread ShB
I just wanted to leave an update about this issue, for someone else who might
come across it. The problem was with memory, but it was disk memory and not
heap/off-heap memory. Yarn was killing off my containers as they exceeded
the threshold for disk utilization and this was manifesting as Task manager
was lost/killed or JobClientActorConnectionTimeoutException: Lost connection
to the JobManager. Digging deep into the individual instance node manager
logs provided some hints about it being a disk issue. 

Some fixes for this problem:
yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage
-- can be increased to alleviate the problem temporarily. 
Increasing the disk capacity on each task manager is a more long-term fix. 
Increasing the number of task managers increases available disk memory and
hence is also a fix.

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Stumped writing to KafkaJSONSink

2017-10-17 Thread Kenny Gorman
I am hoping you guys can help me. I am stumped how to actually write to Kafka 
using Kafka09JsonTableSink using the Table API. Here is my code below, I am 
hoping you guys can shed some light on how this should be done. I don’t see any 
methods for the actual write to Kafka. I am probably doing something stupid. 
TIA.

Thanks!
Kenny

// run some SQL to filter results where a key is not null
String sql = "SELECT icao FROM flights WHERE icao is not null";
tableEnv.registerTableSource("flights", kafkaTableSource);
Table result = tableEnv.sql(sql);

// create a partition for the data going into kafka
FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();

// create new tablesink of JSON to kafka
KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
params.getRequired("write-topic"),
params.getProperties(),
partition);

result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do this, 
but no such method..

Maven release

2017-10-17 Thread Biswajit Das
Hi Team ,

Is there any instruction any where like how to publish release  , I have
been trying to publish release to my own private nexus  repository but some
how it seems always trying to upload
*https://repository.apache.org/service/local/staging/deploy/
  *even I
tried to set with *--settings* my *settings.xml *and *-Durl *but some where
it's is picking always apache nexus repository . I don't see any
distribution entry in any POM .

Thank you .
Biswajit


Re: Split a dataset

2017-10-17 Thread Fabian Hueske
Unfortunately, it's not possible to bridge the gap between the DataSet and
DataStream APIs.

However, you can also use a CsvInputFormat in the DataStream API. Since
there's no built-in API to configure the CSV input, you would have to
create (and configure) the CsvInputFormat yourself.
Once you have the CsvInputFormat, you can create a DataStream using
StreamExecutionEnvironment.readFile(csvIF).

Hope this helps,
Fabian

2017-10-17 11:05 GMT+02:00 Magnus Vojbacke :

> Thank you, Fabian! If batch semantics are not important to my use case, is
> there any way to "downgrade" or convert a DataSet to a DataStream?
>
> BR
> /Magnus
>
> On 17 Oct 2017, at 10:54, Fabian Hueske  wrote:
>
> Hi Magnus,
>
> there is no Split operator on the DataSet API.
>
> As you said, this can be done using a FilterFunction. This also allows for
> non-binary splits:
>
> DataSet setToSplit = ...
> DataSet firstSplit = setToSplit.filter(new SplitCondition1());
> DataSet secondSplit = setToSplit.filter(new SplitCondition2());
> DataSet thirdSplit = setToSplit.filter(new SplitCondition3());
>
> where SplitCondition1, SplitCondition2, and SplitCondition3 are
> FilterFunction that filter out all records that don't belong to the split.
>
> Best, Fabian
>
> 2017-10-17 10:42 GMT+02:00 Magnus Vojbacke :
>
>> I'm looking for something like DataStream.split(), but for DataSets. I'd
>> like to split my streaming data so messages go to different parts of an
>> execution graph, based on arbitrary logic.
>>
>> DataStream.split() seems to be perfect, except that my source is a CSV
>> file, and I have only found built in functions for reading CSV files into a
>> DataSet.
>>
>> I've evaluated using DataSet.filter(), but as far as I can tell, that
>> only allows me to emulate a yes/no split. This is not ideal because it's
>> too coarse, and I would prefer a more fine grained split than that.
>>
>>
>> Do you have any suggestions on how I can achieve my arbitrary splitting
>> logic for a) DataSets in general, or b) CSV files?
>>
>>
>
>


Re: GROUP BY TUMBLE on ROW range

2017-10-17 Thread Fabian Hueske
Hi Stefano,

this is not supported in Flink's SQL and we would need new Group Window
functions (like TUMBLE) for this.
A TUMBLE_COUNT function would be somewhat similar to SESSION, which also
requires checks on the sorted neighboring rows to identify the window of a
row.
Such a function would first need to be added to Calcite and then integrated
with Flink.

A tumble count could also be expressed in plain SQL but wouldn't be very
intuitive. You would have to
- define an over window (maybe partitioned on some key) sorted on time with
a ROW_NUMBER function that assigns increasing numbers to rows.
- do a group by on the row number modulo the window size.

Btw. count windows are supported by the Table API.

Best, Fabian



2017-10-17 17:16 GMT+02:00 Stefano Bortoli :

> Hi all,
>
>
>
> Is there a way to use a tumble window group by with row range in streamSQL?
>
>
>
> I mean, something like this:
>
>
>
> //  "SELECT COUNT(*) " +
>
> // "FROM T1 " +
>
> //"GROUP BY TUMBLE(rowtime, INTERVAL '2' ROWS PRECEDING )"
>
>
>
> However, even looking at tests and looking at the “row interval expression
> generation” I could not find any examples in SQL. I know it is supported by
> the stream APIs, and countWindow is the chosen abstraction.
>
>
>
> table
>
>   .window(Tumble over 2.rows on 'long as 'w)
>
>   .groupBy('w)
>
>   .select('int.count)
>
>   .toDataSet[Row]
>
>
>
> I fear I am missing something simple. Thanks a lot for the support guys!
>
>
>
> Best,
>
> Stefano
>


Re: Empty directories left over from checkpointing

2017-10-17 Thread Elias Levy
Stephan,

Thanks for taking care of this.  We'll give it a try once 1.4 drops.

On Sat, Oct 14, 2017 at 1:25 PM, Stephan Ewen  wrote:

> Some updates on this:
>
> Aside from reworking how the S3 directory handling is done, we also looked
> into supporting S3 different than we currently do. Currently support goes
> strictly through Hadoop's S3 file systems, which we need to change, because
> we want it to be possible to use Flink without Hadoop dependencies.
>
> In the next release, we will have S3 file systems without Hadoop
> dependency:
>
>   - One implementation wraps and shades a newer version of s3a. For
> compatibility with current behavior.
>
>   - The second is interesting for this directory problem: It uses Pesto's
> S3 support which is a bit different from Hadoop' s3n and s3a. It does not
> create empty directly marker files, hence it is not trying to make S3 look
> as much like a file system as s3a and s3n are, but that is actually of
> advantage for checkpointing. With that implementation, the here mentioned
> issue should not exist.
>
> Caveat: The new file systems and their aggressive shading needs to be
> testet at scale still, but we are happy to take any feedback on this.
>
> Merged as of https://github.com/apache/flink/commit/
> 991af3652479f85f732cbbade46bed7df1c5d819
>
> You can use them by simply dropping the respective JARs from "/opt" into
> "/lib" and using the file system scheme "s3://".
> The configuration is as in Hadoop/Presto, but you can drop the config keys
> into the Flink configuration - they will be forwarded to the Hadoop
> configuration.
>
> Hope that this makes the S3 use a lot easier and more fun...
>
>
> On Wed, Sep 20, 2017 at 2:49 PM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> We recently removed some cleanup code, because it involved checking some
>> store meta data to check when we can delete a directory. For certain stores
>> (like S3), requesting this meta data whenever we delete a file was so
>> expensive that it could bring down the job because removing state could not
>> be processed fast enough. We have a temporary fix in place now, so that
>> jobs at large scale can still run reliably on stores like S3. Currently,
>> this comes at the cost of not cleaning up directories but we are clearly
>> planning to introduce a different mechanism for directory cleanup in the
>> future that is not as fine grained as doing meta data queries per file
>> delete. In the meantime, unfortunately the best way is to cleanup empty
>> directories with some external tool.
>>
>> Best,
>> Stefan
>>
>> Am 20.09.2017 um 01:23 schrieb Hao Sun :
>>
>> Thanks Elias! Seems like there is no better answer than "do not care
>> about them now", or delete with a background job.
>>
>> On Tue, Sep 19, 2017 at 4:11 PM Elias Levy 
>> wrote:
>>
>>> There are a couple of related JIRAs:
>>>
>>> https://issues.apache.org/jira/browse/FLINK-7587
>>> https://issues.apache.org/jira/browse/FLINK-7266
>>>
>>>
>>> On Tue, Sep 19, 2017 at 12:20 PM, Hao Sun  wrote:
>>>
 Hi, I am using RocksDB and S3 as storage backend for my checkpoints.
 Can flink delete these empty directories automatically? Or I need a
 background job to do the deletion?

 I know this has been discussed before, but I could not get a concrete
 answer for it yet. Thanks

 

>>>
>>>
>>
>


GROUP BY TUMBLE on ROW range

2017-10-17 Thread Stefano Bortoli
Hi all,

Is there a way to use a tumble window group by with row range in streamSQL?

I mean, something like this:

//  "SELECT COUNT(*) " +
// "FROM T1 " +
//"GROUP BY TUMBLE(rowtime, INTERVAL '2' ROWS PRECEDING )"

However, even looking at tests and looking at the "row interval expression 
generation" I could not find any examples in SQL. I know it is supported by the 
stream APIs, and countWindow is the chosen abstraction.

table
  .window(Tumble over 2.rows on 'long as 'w)
  .groupBy('w)
  .select('int.count)
  .toDataSet[Row]

I fear I am missing something simple. Thanks a lot for the support guys!

Best,
Stefano


Re: hadoopcompatibility not in dist

2017-10-17 Thread eSKa
bumping up that issue, as i have similar problem now.

We are running flink on Yarn and trying to submit job via java api using
YarnClusterClient (run method with PackagedProgram). Job starts to execute
(we can see it on Dashboard) but fails with error:


Caused by: java.lang.RuntimeException: Could not load the TypeInformation
for the class 'org.apache.hadoop.io.Writable'. You may be missing the
'flink-hadoop-compatibility' dependency.
at
org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2143)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1774)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1716)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:953)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1173)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:886)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:966)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:828)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:622)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:188)
at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:266)



when i run the same job from command line on machine where flink is
installed, job is running fine ( we had previously same error, but adding
jar to ./lib/ directory solved the issue).





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Split a dataset

2017-10-17 Thread Magnus Vojbacke
Thank you, Fabian! If batch semantics are not important to my use case, is 
there any way to "downgrade" or convert a DataSet to a DataStream?

BR
/Magnus

> On 17 Oct 2017, at 10:54, Fabian Hueske  wrote:
> 
> Hi Magnus,
> 
> there is no Split operator on the DataSet API.
> 
> As you said, this can be done using a FilterFunction. This also allows for 
> non-binary splits:
> 
> DataSet setToSplit = ...
> DataSet firstSplit = setToSplit.filter(new SplitCondition1());
> DataSet secondSplit = setToSplit.filter(new SplitCondition2());
> DataSet thirdSplit = setToSplit.filter(new SplitCondition3());
> 
> where SplitCondition1, SplitCondition2, and SplitCondition3 are 
> FilterFunction that filter out all records that don't belong to the split.
> 
> Best, Fabian
> 
> 2017-10-17 10:42 GMT+02:00 Magnus Vojbacke  >:
> I'm looking for something like DataStream.split(), but for DataSets. I'd like 
> to split my streaming data so messages go to different parts of an execution 
> graph, based on arbitrary logic.
> 
> DataStream.split() seems to be perfect, except that my source is a CSV file, 
> and I have only found built in functions for reading CSV files into a DataSet.
> 
> I've evaluated using DataSet.filter(), but as far as I can tell, that only 
> allows me to emulate a yes/no split. This is not ideal because it's too 
> coarse, and I would prefer a more fine grained split than that.
> 
> 
> Do you have any suggestions on how I can achieve my arbitrary splitting logic 
> for a) DataSets in general, or b) CSV files?
> 
> 



Re: Unbalanced job scheduling

2017-10-17 Thread AndreaKinn
I'm in contact with the founder of the library to deal with the problem. I'm
trying also to understand how implement myself slotSharingGroups



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Split a dataset

2017-10-17 Thread Fabian Hueske
Hi Magnus,

there is no Split operator on the DataSet API.

As you said, this can be done using a FilterFunction. This also allows for
non-binary splits:

DataSet setToSplit = ...
DataSet firstSplit = setToSplit.filter(new SplitCondition1());
DataSet secondSplit = setToSplit.filter(new SplitCondition2());
DataSet thirdSplit = setToSplit.filter(new SplitCondition3());

where SplitCondition1, SplitCondition2, and SplitCondition3 are
FilterFunction that filter out all records that don't belong to the split.

Best, Fabian

2017-10-17 10:42 GMT+02:00 Magnus Vojbacke :

> I'm looking for something like DataStream.split(), but for DataSets. I'd
> like to split my streaming data so messages go to different parts of an
> execution graph, based on arbitrary logic.
>
> DataStream.split() seems to be perfect, except that my source is a CSV
> file, and I have only found built in functions for reading CSV files into a
> DataSet.
>
> I've evaluated using DataSet.filter(), but as far as I can tell, that only
> allows me to emulate a yes/no split. This is not ideal because it's too
> coarse, and I would prefer a more fine grained split than that.
>
>
> Do you have any suggestions on how I can achieve my arbitrary splitting
> logic for a) DataSets in general, or b) CSV files?
>
>


Split a dataset

2017-10-17 Thread Magnus Vojbacke
I'm looking for something like DataStream.split(), but for DataSets. I'd like 
to split my streaming data so messages go to different parts of an execution 
graph, based on arbitrary logic.

DataStream.split() seems to be perfect, except that my source is a CSV file, 
and I have only found built in functions for reading CSV files into a DataSet.

I've evaluated using DataSet.filter(), but as far as I can tell, that only 
allows me to emulate a yes/no split. This is not ideal because it's too coarse, 
and I would prefer a more fine grained split than that.


Do you have any suggestions on how I can achieve my arbitrary splitting logic 
for a) DataSets in general, or b) CSV files?



Re: Unbalanced job scheduling

2017-10-17 Thread Fabian Hueske
Setting the slot sharing group is Flink's mechanism to solve this issue.
I'd consider this a limitation of the library that provides LEARN and
SELECT.

Did you consider to open an issue at (or contributing to) the library to
support setting the slotSharing group?

2017-10-17 9:38 GMT+02:00 AndreaKinn :

> Yes, I considered them but unfortunately I can't call setSlotSharingGroup
> method on LEARN and SELECT operators.
>
> I can call it on the other operators but this means that the two LEARN
> method will be constrained in the same "unnamed" slot.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Case Class TypeInformation

2017-10-17 Thread Fabian Hueske
Hi Joshua,

that's a limitation of the Scala API.
Row requires to explicitly specify a TypeInformation[Row] but it is not
possible to inject custom types into a CaseClassTypeInfo, which are
automatically generated by a Scala compiler plugin.

The probably easiest solution is to use Flink's Java Tuple classes instead
of a case class.

You can import the Java Tuples with
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}

And create a TupleTypeInfo for example with
new TupleTypeInfo(new RowTypeInfo(Types.STRING, Types.LONG), Types.DOUBLE)

Best, Fabian


2017-10-16 23:26 GMT+02:00 Joshua Griffith :

> Correction: I have the row’s RowTypeInfo at runtime before the job starts.
> I don’t have RowTypeInfo at compile time.
>
> On Oct 16, 2017, at 4:15 PM, Joshua Griffith 
> wrote:
>
> Hello,
>
> I have a case class that wraps a Flink Row and I’d like to use fields from
> that Row in a delta iteration join condition. I only have the row’s fields
> after the job starts. I can construct RowTypeInfo for the Row but I’m not
> sure how to add that to Flink’s generated type information for the case
> class. Without it, I understandably get the following error because Flink
> doesn’t know the Row’s TypeInformation:
>
> org.apache.flink.api.common.InvalidProgramException: This type
> (GenericType) cannot be used as key.
>
>
> Is there a way to manually construct or annotate the type information for
> the case class to provide the Row’s type information so it can be used in a
> join? I could alternately replace the case class with a Tuple and construct
> a TupleTypeInfo but a tuple is more difficult to use than a case class.
>
> Thanks,
>
> Joshua
>
>
>


Re: Unbalanced job scheduling

2017-10-17 Thread AndreaKinn
Yes, I considered them but unfortunately I can't call setSlotSharingGroup
method on LEARN and SELECT operators.

I can call it on the other operators but this means that the two LEARN
method will be constrained in the same "unnamed" slot.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Unbalanced job scheduling

2017-10-17 Thread Fabian Hueske
Hi Andrea,

have you looked into assigning slot sharing groups [1]?

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups

2017-10-16 18:01 GMT+02:00 AndreaKinn :

> Hi all,
> I want to expose you my program flow.
>
> I have the following operators:
>
> kafka-source -> timestamp-extractor -> map -> keyBy -> window -> apply ->
> LEARN -> SELECT -> process -> cassandra-sink
>
> the LEARN and SELECT operators belong to an external library supported by
> flink. LEARN is a very heavy operation compared to the other operators.
>
> Unfortunately LEARN has a max parallelism of 1, so if I have a cluster of 2
> TM with 1 slot each and I set parallelism = 2 I will have one TM which
> performs a parallel instances of all the operators and the single instance
> of LEARN while the other one TM performs just the second parallel instances
> of all the operators (clearly there are no more instance of LEARN).
> That's ok and I have no problem with understanding it.
>
> *** The problem:
> Actually I have 2 identical flows like this because it matches a situation
> where I have two sensor streams so really I have 2 LEARN operators
> corresponding to two independent streams.
>
> By the way I noted that even in this case I have one TM which take a load
> of
> the parallel instances of all the operators AND the single instances of
> LEARN-1 and LEARN-2 while the other one TM performs just the second
> parallel
> instances of all the operators (no LEARN instances here).
>
> Since LEARN is an heavy operator this lead to a very unbalanced load on the
> cluster, so much that the first TM is killed during the execution (looking
> at the logs it probably happens because it has not enough memory, in fact
> the sink execution is very very slow, it seems like the LEARN is a
> bottleneck).
>
> Honestly I can't understand why Flink don't assign 1 LEARN operator to one
> TM and the other one LEARN to the other one TM.
> This won't let me to stress the cluster properly because I will have always
> one TM super busy and the other one quite "free" and unstressed.
>
> Bye,
> Andrea
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>