Flink threads cpu busy/stuck at NoFetchingInput.require (Versions 1.2 and 1.4)

2018-08-19 Thread Mehar Simhadri (mesimhad)
Hi Flink Community,

In our flink deployments, we see some flink threads are cpu busy/stuck after 
few hours with the below stack

"Sink:AggregationSink (2/4)" #567 daemon prio=5 os_prio=0 
tid=0x7f901dc97000 nid=0x254 runnable [0x7f8fe017f000]
java.lang.Thread.State: RUNNABLE
at 
org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:70)
at com.esotericsoftware.kryo.io.Input.fill(Input.java:146)
at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:187)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:40)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:147)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656)
at java.lang.Thread.run(Thread.java:745)

Few observations from the stack, while it is stuck.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord
nonSpanningRemaining = 13

All these 13 bytes were read even before reaching 
com.esotericsoftware.kryo.io.Input.readVarInt

We are wondering is this a serialization bug or memory segment corruption?

Any pointers on how to debug further will be much appreciated.

Regards,
Mehar


Re: processWindowFunction

2018-08-19 Thread antonio saldivar
Thank you fro the references

I have now my processFunction and getting the state but now how can i do
for the threshold times to group the elements and also as this is a global
window, how to purge because if going to keep increasing

El dom., 19 ago. 2018 a las 8:57, vino yang ()
escribió:

> Hi antonio,
>
> Regarding your scenario, I think maybe you can consider using the
> ProcessFunction (or keyed ProcessFunction) function directly on the Stream.
> [1]
> It can handle each of your elements with a Timer, and you can combine
> Flink's state API[2] to store your data.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state
>
> Thanks, vino.
>
> antonio saldivar  于2018年8月19日周日 上午10:18写道:
>
>> hi Vino
>>
>> it is possible to use global window, then set the trigger onElement
>> comparing the element that has arrived with for example 10 mins, 20 mins
>> and 60 mins of data?
>>
>> I have rules evaluating sum of amount for 10,20 or 60 mins for the same
>> keyed element if the same id sum like $200 total within those thresholds
>> and count more or equals to 3 I need to be able to set some values to the
>> object if the object does not reach those thresholds i do not set the
>> values and keep sending the output with or without those value.
>>
>> just processing the object on the fly and send output
>>
>>
>>
>>
>>
>>
>>
>> El vie., 17 ago. 2018 a las 22:14, vino yang ()
>> escribió:
>>
>>> Hi antonio,
>>>
>>> Yes, ProcessWindowFunction is a very low level window function.
>>> It allows you to access the data in the window and allows you to
>>> customize the output of the window.
>>> So if you use it, while giving you flexibility, you need to think about
>>> other things, which may require you to write more processing logic.
>>>
>>> Generally speaking, sliding windows usually have some data that is
>>> repeated, but a common mode is to apply a reduce function on it to get your
>>> calculation results.
>>> If you only send data, there will definitely be some duplication.
>>>
>>> Thanks, vino.
>>>
>>> antonio saldivar  于2018年8月17日周五 下午12:01写道:
>>>
 Hi Vino
 thank you for the information, actually I am using a trigger alert and
 processWindowFunction to send my results, but when my window slides or ends
 it sends again the objects and I an getting duplicated data

 El jue., 16 ago. 2018 a las 22:05, vino yang ()
 escribió:

> Hi Antonio,
>
> What results do not you want to get when creating each window?
> Examples of the use of ProcessWindowFunction are included in many test
> files in Flink's project, such as SideOutputITCase.scala or
> WindowTranslationTest.scala.
>
> For more information on ProcessWindowFunction, you can refer to the
> official website.[1]
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction
>
> Thanks, vino.
>
> antonio saldivar  于2018年8月17日周五 上午6:24写道:
>
>> Hello
>>
>> I am implementing a data stream where I use sliding windows but I am
>> stuck because I need to set values to my object based on some if 
>> statements
>> in my process function  and send the object to the next step but I don't
>> want results every time a window is creating
>>
>> if anyone has a good example on this that can help me
>>
>


Re: classloading strangeness with Avro in Flink

2018-08-19 Thread vino yang
Hi Cliff,

My personal guess is that this may be caused by Job's Avro conflict with
the Avro that the Flink framework itself relies on.
Flink has provided some configuration parameters which allows you to
determine the order of the classloaders yourself. [1]
Alternatively, you can debug classloading and participate in the
documentation.[2]

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html

Thanks, vino.

Cliff Resnick  于2018年8月20日周一 上午10:40写道:

> Our Flink/YARN pipeline has been reading Avro from Kafka for a while now.
> We just introduced a source of Avro OCF (Object Container Files) read from
> S3. The Kafka Avro continued to decode without incident, but the OCF files
> failed 100% with anomalous parse errors in the decoding phase after the
> schema and codec were successfully read from them. The pipeline would work
> on my laptop, and when I submitted a test Main program to the Flink Session
> in YARN, that would also successfully decode. Only the actual pipeline run
> from the TaskManager failed. At one point I even remote debugged the
> TaskManager process and stepped through what looked like a normal Avro
> decode (if you can describe Avro code as normal!) -- until it abruptly
> failed with an int decode or what-have-you.
>
> This stumped me for a while, but I finally tried moving flink-avro.jar
> from the lib to the application jar, and that fixed it. I'm not sure why
> this is, especially since there were no typical classloader-type errors.
> This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.
>
> -Cliff
>
>
>
>
>


classloading strangeness with Avro in Flink

2018-08-19 Thread Cliff Resnick
Our Flink/YARN pipeline has been reading Avro from Kafka for a while now.
We just introduced a source of Avro OCF (Object Container Files) read from
S3. The Kafka Avro continued to decode without incident, but the OCF files
failed 100% with anomalous parse errors in the decoding phase after the
schema and codec were successfully read from them. The pipeline would work
on my laptop, and when I submitted a test Main program to the Flink Session
in YARN, that would also successfully decode. Only the actual pipeline run
from the TaskManager failed. At one point I even remote debugged the
TaskManager process and stepped through what looked like a normal Avro
decode (if you can describe Avro code as normal!) -- until it abruptly
failed with an int decode or what-have-you.

This stumped me for a while, but I finally tried moving flink-avro.jar from
the lib to the application jar, and that fixed it. I'm not sure why this
is, especially since there were no typical classloader-type errors.  This
issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.

-Cliff


Re: Implement Joins with Lookup Data

2018-08-19 Thread Hequn Cheng
Hi Harshvardhan,

Have you ever consider adding a cache when lookup from the database, so
that we don't have to add so many pipelines, also don't have to do window
distinct.
The cache can be a LRU cache with size and expire time specified.
If your data is limited it can also be an All data cache. The All data
cache can be updated, say each 2h, according to our requirement.

Adding a cache can not only simplify your pipeline but also improve the job
performance.

Best, Hequn


On Mon, Aug 20, 2018 at 5:42 AM, Harshvardhan Agrawal <
harshvardhan.ag...@gmail.com> wrote:

> Hello Everyone,
>
> Sorry for the delayed response.
> This is what I am thinking of doing. We are thinking of creating 2
> pipelines. The first one only enriches the position data with product
> information. The second pipeline will use the enriched position and get all
> the account information for performing aggregations.
>
> *First Pipeline*:
> 1) Get the positions from Kafka and window data into tumbling windows of
> 30 seconds.
> 2) We perform a rolling aggregation that basically collects all the unique
> product keys in a set.
> 3) At the end of the window, we have a process function that queries an
> external service that performs a single lookup for all the unique products
> we have seen in the window.
> 4) Persist the enriched positions to Kafka topic T1. There is a sink
> process that reads from this Kafka topic (T1), writes to an underlying DB
> and persist to another Kafka topic (T2)  for the pipeline to read from.
>
> *Second Pipeline*
> 1) Reads from topic T2 that contains enriched position.
> 2) For each position, we get the account information and lookup all the
> parent and child accounts associated with that account.
> 3) Once we have all the accounts, we lookup all the enriched positions
> that were created from the first pipeline for those accounts.
> 4) We perform the final aggregation to say calculate the Net Asset Value
> for the account.
> 5) Persist the output to the DB.
>
> Regards,
> Harsh
>
> On Wed, Jul 25, 2018 at 6:52 PM ashish pok  wrote:
>
>> Hi Michael,
>>
>> We are currently using 15 TMs with 4 cores and 4 slots each, 10GB of
>> memory on each TM. We have 15 partitions on Kafka for stream and 6 for
>> context/smaller stream. Heap is around 50%, GC is about 150ms and CPU loads
>> are low. We may be able to reduce resources on this if need be.
>>
>> Thanks,
>>
>>
>> - Ashish
>>
>> On Wednesday, July 25, 2018, 4:07 AM, Michael Gendelman 
>> wrote:
>>
>> Hi Ashish,
>>
>> We are planning for a similar use case and I was wondering if you can
>> share the amount of resources you have allocated for this flow?
>>
>> Thanks,
>> Michael
>>
>> On Tue, Jul 24, 2018, 18:57 ashish pok  wrote:
>>
>> BTW,
>>
>> We got around bootstrap problem for similar use case using a “nohup”
>> topic as input stream. Our CICD pipeline currently passes an initialize
>> option to app IF there is a need to bootstrap and waits for X minutes
>> before taking a savepoint and restart app normally listening to right
>> topic(s). I believe there is work underway to handle this gracefully using
>> Side Input as well. Other than determining X minutes for initialization to
>> complete, we havent had any issue with this solution - we have over 40
>> million states refreshes daily and close to 200Mbps input streams being
>> joined to states.
>>
>> Hope this helps!
>>
>>
>>
>> - Ashish
>>
>> On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <
>> fearsome.lucid...@gmail.com> wrote:
>>
>> Alas, this suffer from the bootstrap problem.  At the moment Flink does
>> not allow you to pause a source (the positions), so you can't fully consume
>> the and preload the accounts or products to perform the join before the
>> positions start flowing.  Additionally, Flink SQL does not support
>> materializing an upset table for the accounts or products to perform the
>> join, so yo have to develop your own KeyedProcessFunction, maintain the
>> state, and perform the join on your own if you only want to join against
>> the latest value for each key.
>>
>> On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann 
>> wrote:
>>
>> Yes, using Kafka which you initialize with the initial values and then
>> feed changes to the Kafka topic from which you consume could be a solution.
>>
>> On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <
>> harshvardhan.ag...@gmail.com> wrote:
>>
>> Hi Till,
>>
>> How would we do the initial hydration of the Product and Account data
>> since it’s currently in a relational DB? Do we have to copy over data to
>> Kafka and then use them?
>>
>> Regards,
>> Harsh
>>
>> On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:
>>
>> Hi Harshvardhan,
>>
>> I agree with Ankit that this problem could actually be solved quite
>> elegantly with Flink's state. If you can ingest the product/account
>> information changes as a stream, you can keep the latest version of it in
>> Flink state by using a co-map function [1, 2]. One input of the co-map
>> function would be 

Re: What's the advantage of using BroadcastState?

2018-08-19 Thread Paul Lam
Hi Rong, Hequn

Your answers are very helpful! Thank you!

Best Regards,
Paul Lam

> 在 2018年8月19日,23:30,Rong Rong  写道:
> 
> Hi Paul,
> 
> To add to Hequn's answer. Broadcast state can typically be used as "a 
> low-throughput stream containing a set of rules which we want to evaluate 
> against all elements coming from another stream" [1] 
> So to add to the difference list is: whether it is "broadcast" across all 
> keys if processing a keyed stream. This is typically when it is not possible 
> to derive same key field using KeySelector in CoStream.
> Another additional difference is performance: BroadcastStream is "stored 
> locally and is used to process all incoming elements on the other stream" 
> thus requires to carefully manage the size of the BroadcastStream.
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html
>  
> 
> On Sun, Aug 19, 2018 at 1:40 AM Hequn Cheng  > wrote:
> Hi Paul,
> 
> There are some differences:
> 1. The BroadcastStream can broadcast data for you, i.e, data will be 
> broadcasted to all downstream tasks automatically. 
> 2. To guarantee that the contents in the Broadcast State are the same across 
> all parallel instances of our operator, read-write access is only given to 
> the broadcast side
> 3. For BroadcastState, flink guarantees that upon restoring/rescaling there 
> will be no duplicates and no missing data. In case of recovery with the same 
> or smaller parallelism, each task reads its checkpointed state. Upon scaling 
> up, each task reads its own state, and the remaining tasks (p_new-p_old) read 
> checkpoints of previous tasks in a round-robin manner. While MapState doesn't 
> have such abilities.
> 
> Best, Hequn
> 
> On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam  > wrote:
> Hi, 
> 
> AFAIK, the difference between a BroadcastStream and a normal DataStream is 
> that the BroadcastStream is with a BroadcastState, but it seems that the 
> functionality of BroadcastState can also be achieved by MapState in a 
> CoMapFunction or something since the control stream is still broadcasted 
> without being turned into BroadcastStream. So, I’m wondering what’s the 
> advantage of using BroadcastState? Thanks a lot!
> 
> Best Regards,
> Paul Lam
> 



Re: Implement Joins with Lookup Data

2018-08-19 Thread Harshvardhan Agrawal
Hello Everyone,

Sorry for the delayed response.
This is what I am thinking of doing. We are thinking of creating 2
pipelines. The first one only enriches the position data with product
information. The second pipeline will use the enriched position and get all
the account information for performing aggregations.

*First Pipeline*:
1) Get the positions from Kafka and window data into tumbling windows of 30
seconds.
2) We perform a rolling aggregation that basically collects all the unique
product keys in a set.
3) At the end of the window, we have a process function that queries an
external service that performs a single lookup for all the unique products
we have seen in the window.
4) Persist the enriched positions to Kafka topic T1. There is a sink
process that reads from this Kafka topic (T1), writes to an underlying DB
and persist to another Kafka topic (T2)  for the pipeline to read from.

*Second Pipeline*
1) Reads from topic T2 that contains enriched position.
2) For each position, we get the account information and lookup all the
parent and child accounts associated with that account.
3) Once we have all the accounts, we lookup all the enriched positions that
were created from the first pipeline for those accounts.
4) We perform the final aggregation to say calculate the Net Asset Value
for the account.
5) Persist the output to the DB.

Regards,
Harsh

On Wed, Jul 25, 2018 at 6:52 PM ashish pok  wrote:

> Hi Michael,
>
> We are currently using 15 TMs with 4 cores and 4 slots each, 10GB of
> memory on each TM. We have 15 partitions on Kafka for stream and 6 for
> context/smaller stream. Heap is around 50%, GC is about 150ms and CPU loads
> are low. We may be able to reduce resources on this if need be.
>
> Thanks,
>
>
> - Ashish
>
> On Wednesday, July 25, 2018, 4:07 AM, Michael Gendelman 
> wrote:
>
> Hi Ashish,
>
> We are planning for a similar use case and I was wondering if you can
> share the amount of resources you have allocated for this flow?
>
> Thanks,
> Michael
>
> On Tue, Jul 24, 2018, 18:57 ashish pok  wrote:
>
> BTW,
>
> We got around bootstrap problem for similar use case using a “nohup” topic
> as input stream. Our CICD pipeline currently passes an initialize option to
> app IF there is a need to bootstrap and waits for X minutes before taking a
> savepoint and restart app normally listening to right topic(s). I believe
> there is work underway to handle this gracefully using Side Input as well.
> Other than determining X minutes for initialization to complete, we havent
> had any issue with this solution - we have over 40 million states refreshes
> daily and close to 200Mbps input streams being joined to states.
>
> Hope this helps!
>
>
>
> - Ashish
>
> On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <
> fearsome.lucid...@gmail.com> wrote:
>
> Alas, this suffer from the bootstrap problem.  At the moment Flink does
> not allow you to pause a source (the positions), so you can't fully consume
> the and preload the accounts or products to perform the join before the
> positions start flowing.  Additionally, Flink SQL does not support
> materializing an upset table for the accounts or products to perform the
> join, so yo have to develop your own KeyedProcessFunction, maintain the
> state, and perform the join on your own if you only want to join against
> the latest value for each key.
>
> On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann 
> wrote:
>
> Yes, using Kafka which you initialize with the initial values and then
> feed changes to the Kafka topic from which you consume could be a solution.
>
> On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
> Hi Till,
>
> How would we do the initial hydration of the Product and Account data
> since it’s currently in a relational DB? Do we have to copy over data to
> Kafka and then use them?
>
> Regards,
> Harsh
>
> On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:
>
> Hi Harshvardhan,
>
> I agree with Ankit that this problem could actually be solved quite
> elegantly with Flink's state. If you can ingest the product/account
> information changes as a stream, you can keep the latest version of it in
> Flink state by using a co-map function [1, 2]. One input of the co-map
> function would be the product/account update stream which updates the
> respective entries in Flink's state and the other input stream is the one
> to be enriched. When receiving input from this stream one would lookup the
> latest information contained in the operator's state and join it with the
> incoming event.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
>
> Cheers,
> Till
>
> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
> Hi,
>
> Thanks for your responses.
>
> There is no fixed interval for the data 

Re: Error on SQL orderBy Error

2018-08-19 Thread Rong Rong
Filed https://issues.apache.org/jira/browse/FLINK-10172. Seems like it is
affecting latest version.

--
Rong

On Sun, Aug 19, 2018 at 8:14 AM Rong Rong  wrote:

> Hi Chris,
>
> This looks like a bug to me as
> val allOrders:Table = orderTable
> .select('id, 'order_date, 'amount, 'customer_id)
> .orderBy('id.asc)
>
> works perfectly fine. Could you file a bug report and kindly provide the
> Flink version you are using? I can take a look into it
>
> Thanks,
> Rong
>
> On Sun, Aug 19, 2018 at 5:42 AM chrisr123  wrote:
>
>> Use Case:
>> I have a CSV file with data that I want to do a SELECT with orderBy.
>> I'm getting this error below. What am I doing incorrectly? Thanks!
>>
>> *Expression (('id).asc).asc failed on input check: Sort should only based
>> on
>> field reference
>> *
>>
>> *Input File structure:*
>> id,order_date,amount,customer_id
>> 3000,2018-04-19,192.74,1005
>> 3001,2017-08-18,432.87,1000
>> 3002,2018-08-18,22.19,1002
>>
>> *Source code:*
>> CsvTableSource orderTableSource = CsvTableSource.builder()
>> .path("input/batch/orders.csv")
>> .ignoreFirstLine()
>> .fieldDelimiter(",")
>> .field("id", Types.INT())
>> .field("order_date", Types.SQL_DATE())
>> .field("amount", Types.DECIMAL())
>> .field("customer_id", Types.LONG())
>> .build();
>>
>>
>> tableEnv.registerTableSource("orders", orderTableSource);
>> Table orderTable = tableEnv.scan("orders");
>>
>> // SELECT *
>> // FROM orders
>> Table allOrders = orderTable
>> .select("id,order_date,amount,customer_id")
>> .orderBy("id.asc");
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: What's the advantage of using BroadcastState?

2018-08-19 Thread Rong Rong
Hi Paul,

To add to Hequn's answer. Broadcast state can typically be used as "a
low-throughput stream containing a set of rules which we want to evaluate
against all elements coming from another stream" [1]
So to add to the difference list is: whether it is "broadcast" across all
keys if processing a keyed stream. This is typically when it is not
possible to derive same key field using KeySelector in CoStream.
Another additional difference is performance: BroadcastStream is "stored
locally and is used to process all incoming elements on the other stream"
thus requires to carefully manage the size of the BroadcastStream.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html

On Sun, Aug 19, 2018 at 1:40 AM Hequn Cheng  wrote:

> Hi Paul,
>
> There are some differences:
> 1. The BroadcastStream can broadcast data for you, i.e, data will be
> broadcasted to all downstream tasks automatically.
> 2. To guarantee that the contents in the Broadcast State are the same
> across all parallel instances of our operator, read-write access is only
> given to the broadcast side
> 3. For BroadcastState, flink guarantees that upon restoring/rescaling
> there will be no duplicates and no missing data. In case of recovery with
> the same or smaller parallelism, each task reads its checkpointed state.
> Upon scaling up, each task reads its own state, and the remaining tasks
> (p_new-p_old) read checkpoints of previous tasks in a round-robin manner.
> While MapState doesn't have such abilities.
>
> Best, Hequn
>
> On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam  wrote:
>
>> Hi,
>>
>> AFAIK, the difference between a BroadcastStream and a normal DataStream
>> is that the BroadcastStream is with a BroadcastState, but it seems that the
>> functionality of BroadcastState can also be achieved by MapState in a
>> CoMapFunction or something since the control stream is still broadcasted
>> without being turned into BroadcastStream. So, I’m wondering what’s the
>> advantage of using BroadcastState? Thanks a lot!
>>
>> Best Regards,
>> Paul Lam
>>
>
>


Re: Error on SQL orderBy Error

2018-08-19 Thread Rong Rong
Hi Chris,

This looks like a bug to me as
val allOrders:Table = orderTable
.select('id, 'order_date, 'amount, 'customer_id)
.orderBy('id.asc)

works perfectly fine. Could you file a bug report and kindly provide the
Flink version you are using? I can take a look into it

Thanks,
Rong

On Sun, Aug 19, 2018 at 5:42 AM chrisr123  wrote:

> Use Case:
> I have a CSV file with data that I want to do a SELECT with orderBy.
> I'm getting this error below. What am I doing incorrectly? Thanks!
>
> *Expression (('id).asc).asc failed on input check: Sort should only based
> on
> field reference
> *
>
> *Input File structure:*
> id,order_date,amount,customer_id
> 3000,2018-04-19,192.74,1005
> 3001,2017-08-18,432.87,1000
> 3002,2018-08-18,22.19,1002
>
> *Source code:*
> CsvTableSource orderTableSource = CsvTableSource.builder()
> .path("input/batch/orders.csv")
> .ignoreFirstLine()
> .fieldDelimiter(",")
> .field("id", Types.INT())
> .field("order_date", Types.SQL_DATE())
> .field("amount", Types.DECIMAL())
> .field("customer_id", Types.LONG())
> .build();
>
>
> tableEnv.registerTableSource("orders", orderTableSource);
> Table orderTable = tableEnv.scan("orders");
>
> // SELECT *
> // FROM orders
> Table allOrders = orderTable
> .select("id,order_date,amount,customer_id")
> .orderBy("id.asc");
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: processWindowFunction

2018-08-19 Thread vino yang
Hi antonio,

Regarding your scenario, I think maybe you can consider using the
ProcessFunction (or keyed ProcessFunction) function directly on the Stream.
[1]
It can handle each of your elements with a Timer, and you can combine
Flink's state API[2] to store your data.

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state

Thanks, vino.

antonio saldivar  于2018年8月19日周日 上午10:18写道:

> hi Vino
>
> it is possible to use global window, then set the trigger onElement
> comparing the element that has arrived with for example 10 mins, 20 mins
> and 60 mins of data?
>
> I have rules evaluating sum of amount for 10,20 or 60 mins for the same
> keyed element if the same id sum like $200 total within those thresholds
> and count more or equals to 3 I need to be able to set some values to the
> object if the object does not reach those thresholds i do not set the
> values and keep sending the output with or without those value.
>
> just processing the object on the fly and send output
>
>
>
>
>
>
>
> El vie., 17 ago. 2018 a las 22:14, vino yang ()
> escribió:
>
>> Hi antonio,
>>
>> Yes, ProcessWindowFunction is a very low level window function.
>> It allows you to access the data in the window and allows you to
>> customize the output of the window.
>> So if you use it, while giving you flexibility, you need to think about
>> other things, which may require you to write more processing logic.
>>
>> Generally speaking, sliding windows usually have some data that is
>> repeated, but a common mode is to apply a reduce function on it to get your
>> calculation results.
>> If you only send data, there will definitely be some duplication.
>>
>> Thanks, vino.
>>
>> antonio saldivar  于2018年8月17日周五 下午12:01写道:
>>
>>> Hi Vino
>>> thank you for the information, actually I am using a trigger alert and
>>> processWindowFunction to send my results, but when my window slides or ends
>>> it sends again the objects and I an getting duplicated data
>>>
>>> El jue., 16 ago. 2018 a las 22:05, vino yang ()
>>> escribió:
>>>
 Hi Antonio,

 What results do not you want to get when creating each window?
 Examples of the use of ProcessWindowFunction are included in many test
 files in Flink's project, such as SideOutputITCase.scala or
 WindowTranslationTest.scala.

 For more information on ProcessWindowFunction, you can refer to the
 official website.[1]

 [1]:
 https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction

 Thanks, vino.

 antonio saldivar  于2018年8月17日周五 上午6:24写道:

> Hello
>
> I am implementing a data stream where I use sliding windows but I am
> stuck because I need to set values to my object based on some if 
> statements
> in my process function  and send the object to the next step but I don't
> want results every time a window is creating
>
> if anyone has a good example on this that can help me
>



Re: Job Manager killed by Kubernetes during recovery

2018-08-19 Thread vino yang
Hi Bruno,

Ping Till for you, he may give you some useful information.

Thanks, vino.

Bruno Aranda  于2018年8月19日周日 上午6:57写道:

> Hi,
>
> I am experiencing an issue when a job manager is trying to recover using a
> HA setup. When the job manager starts again and tries to resume from the
> last checkpoints, it gets killed by Kubernetes (I guess), since I can see
> the following in the logs while the jobs are deployed:
>
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>
> I am requesting enough memory for it, 3000Gi, and it is configured to use
> 2048Gb of memory. I have tried to increase the max perm size, but did not
> see an improvement.
>
> Any suggestions to help diagnose this?
>
> I have the following:
>
> Flink 1.6.0 (same with 1.5.1)
> Azure AKS with Kubernetes 1.11
> State management using RocksDB with checkpoints stored in Azure Data Lake
>
> Thanks!
>
> Bruno
>
>


Error on SQL orderBy Error

2018-08-19 Thread chrisr123
Use Case:
I have a CSV file with data that I want to do a SELECT with orderBy.
I'm getting this error below. What am I doing incorrectly? Thanks!

*Expression (('id).asc).asc failed on input check: Sort should only based on
field reference
*

*Input File structure:*
id,order_date,amount,customer_id
3000,2018-04-19,192.74,1005
3001,2017-08-18,432.87,1000
3002,2018-08-18,22.19,1002

*Source code:*
CsvTableSource orderTableSource = CsvTableSource.builder()
.path("input/batch/orders.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id", Types.INT())
.field("order_date", Types.SQL_DATE())
.field("amount", Types.DECIMAL())
.field("customer_id", Types.LONG())
.build();


tableEnv.registerTableSource("orders", orderTableSource);
Table orderTable = tableEnv.scan("orders");

// SELECT *
// FROM orders
Table allOrders = orderTable
.select("id,order_date,amount,customer_id")
.orderBy("id.asc");



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error while trigger checkpoint due to Kyro Exception

2018-08-19 Thread Stefan Richter
Hi,

this problem is fixed in Flink >= 1.4.3, see 
https://issues.apache.org/jira/browse/FLINK-8836 
.

Best,
Stefan

> Am 18.08.2018 um 05:07 schrieb Bruce Qiu :
> 
> Hi Community,
> I am using Flink 1.4.2 to do streaming processing. I fetch data from Kafka 
> and write the parquet file to HDFS. In the previous environment, the Kafka 
> had 192 partitions and I set the source parallelism to 192, the application 
> works fine. But recently we had increased the Kafka paritions to 384. So I 
> changed the source parallelism to 384. After I made this change, the 
> application throws the exception as blow, and the checkpoint is always fail. 
> Also I saw the backpressure is very high in the ColFlatMap stage. My 
> application’s DAG as blow. Can someone helps me about this exception, thanks 
> a lot.
>  
> DAG Stage
>  
> 
>  
>  
> Exception stack trace:
>  
> java.lang.Exception: Error while triggering checkpoint 109 for Source: Custom 
> Source (257/384)
>   at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1210)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not perform checkpoint 109 for operator 
> Source: Custom Source (257/384).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:544)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:111)
>   at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1199)
>   ... 5 more
> Caused by: java.lang.Exception: Could not complete snapshot 109 for operator 
> Source: Custom Source (257/384).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:378)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:538)
>   ... 7 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>   at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>   at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>   at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.(DefaultOperatorStateBackend.java:448)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:460)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:220)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:363)
>   ... 12 more
>  
>  
> Regards,
> Bruce
> 



Re: What's the advantage of using BroadcastState?

2018-08-19 Thread Hequn Cheng
Hi Paul,

There are some differences:
1. The BroadcastStream can broadcast data for you, i.e, data will be
broadcasted to all downstream tasks automatically.
2. To guarantee that the contents in the Broadcast State are the same
across all parallel instances of our operator, read-write access is only
given to the broadcast side
3. For BroadcastState, flink guarantees that upon restoring/rescaling there
will be no duplicates and no missing data. In case of recovery with the
same or smaller parallelism, each task reads its checkpointed state. Upon
scaling up, each task reads its own state, and the remaining tasks
(p_new-p_old) read checkpoints of previous tasks in a round-robin manner.
While MapState doesn't have such abilities.

Best, Hequn

On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam  wrote:

> Hi,
>
> AFAIK, the difference between a BroadcastStream and a normal DataStream is
> that the BroadcastStream is with a BroadcastState, but it seems that the
> functionality of BroadcastState can also be achieved by MapState in a
> CoMapFunction or something since the control stream is still broadcasted
> without being turned into BroadcastStream. So, I’m wondering what’s the
> advantage of using BroadcastState? Thanks a lot!
>
> Best Regards,
> Paul Lam
>