Re: Does Flink SQL "in" operation has length limit?

2018-10-01 Thread Rong Rong
Hi Fabian. Yes I think that was what I missed. I haven't looked into the
code but just inferring from the translated plan pasted by Henry.

Let me try to take a look and put in a fix for this.

Thanks,
Rong

On Mon, Oct 1, 2018, 7:28 AM Fabian Hueske  wrote:

> Hi,
>
> I had a look into the code. From what I saw, we are translating the values
> into Rows.
> The problem here is that the IN clause is translated into a join and that
> the join results contains a time attribute field. This is a safety
> restriction to ensure that time attributes do not lose their watermark
> alignment because joins can return their results in random order. This
> should be related to or same as [1].
>
> Anyway, we should not translate IN clauses to joins for incrementally
> evaluated queries (aka. streaming queries).
> The main problem here is that the join materializes both inputs which is
> fine for the VALUES input but not for the "stream".
> I created FLINK-10474 to fix the problem.
>
> A workaround for the problem could be a user-defined scalar function that
> replaces the IN clause.
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-10211
> [2] https://issues.apache.org/jira/browse/FLINK-10474
>
> Am Mo., 1. Okt. 2018 um 10:01 Uhr schrieb Timo Walther  >:
>
>> Hi,
>>
>> tuple should not be used anywhere in flink-table. @Rong can you point us
>> to the corresponding code? I haven't looked into the code but we should
>> definitely support this query. @Henry feel free to open an issue for it.
>>
>> Regards,
>> Timo
>>
>>
>> Am 28.09.18 um 19:14 schrieb Rong Rong:
>>
>> Yes.
>>
>> Thanks for bringing this up Hequn! :-) I think Tuple would not be the
>> best container to use.
>>
>> However, in search for alternative, shouldn't Collection / List be a more
>> suitable solution? Row seems to not fit in the context (as there can be
>> Rows with elements of different type).
>> I vaguely recall there was similar JIRA but might not be related to IN
>> clause. Let me try to dig it up.
>>
>> --
>> Rong
>>
>> On Fri, Sep 28, 2018 at 9:32 AM Hequn Cheng  wrote:
>>
>>> Hi,
>>>
>>> I haven't look into the code. If this is limited by Tuple, would it
>>> better to implement it with Row?
>>>
>>> Best, Hequn
>>>
>>> On Fri, Sep 28, 2018 at 9:27 PM Rong Rong  wrote:
>>>
 Hi Henry, Vino.

 I think IN operator was translated into either a RexSubQuery or a
 SqlStdOperatorTable.IN operator.
 I think Vino was referring to the first case.
 For the second case (I think that's what you are facing here), they are
 converted into tuples and the maximum we currently have in Flink was
 Tuple25.java, I was wondering if that was the issue you are facing.
 You can probably split the IN into many IN combining with OR.

 --
 Rong

 On Fri, Sep 28, 2018 at 2:33 AM vino yang 
 wrote:

> Hi Henry,
>
> Maybe the number of elements in your IN clause is out of range? Its
> default value is 20, you can modify it with this configuration item:
>
> *withInSubQueryThreshold(XXX)*
>
> This API comes from Calcite.
>
> Thanks, vino.
>
> 徐涛  于2018年9月28日周五 下午4:23写道:
>
>> Hi,
>>
>> When I am executing the following SQL in flink 1.6.1, some error 
>> throws out saying that it has a support issue, but when I reduce the 
>> number of integers in the “in” sentence, for example,
>>
>> trackId in (124427150,71648998) , Flink does not complain anything, 
>> so I wonder is there any length limit in “in” operation?
>>
>> Thanks a lot.
>>
>> SELECT
>> trackId as id,track_title as description, count(*) as cnt
>> FROM
>> play
>> WHERE
>> appName='play.statistics.trace' and
>> trackId in 
>> (124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
>> GROUP BY
>> HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' 
>> MINUTE),trackId,track_title;
>>
>>
>>
>> FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
>>   FlinkLogicalCalc(expr#0..3=[{inputs}], started_at_ts=[$t2],
>> trackId=[$t0], track_title=[$t1])
>> FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
>>   FlinkLogicalCalc(expr#0..4=[{inputs}],
>> expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)],
>> trackId=[$t1], track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
>> FlinkLogicalNativeTableScan(table=[[play]])
>>   FlinkLogicalValues(tuples=[[{ 124427150 }, { 71648998 }, 

Event timers - metrics

2018-10-01 Thread Alexey Trenikhun
Hello,
Are built-in timer metrics? For example  number of registered timers,  number 
of triggered timers etc

Thanks,
Alexey


Flink support for multiple data centers

2018-10-01 Thread Olga Luganska
Hello,

Does Flink support multiple data center implementation and failover procedures 
in case one of the data centers goes down?

Another question I have is about data encryption. If  application state which 
needs to be checkpointed contains data elements which are considered to be a 
personally identifiable information, or maybe some credit card information, do 
you provide any encryption mechanisms to make sure that this data will be 
secured?

Thank you very much,
Olga


Re: Deserialization of serializer errored

2018-10-01 Thread Elias Levy
Any of the Flink folks seen this before?

On Fri, Sep 28, 2018 at 5:23 PM Elias Levy 
wrote:

> I am experiencing a rather odd error.  We have a job running on a Flink
> 1.4.2 cluster with two Kafka input streams, one of the streams is processed
> by an async function, and the output of the async function and the other
> original stream are consumed by a CoProcessOperator, that intern emits
> Scala case class instances, that go into a stateful ProcessFunction filter,
> and then into a sink.  I.e.
>
> source 1 -> async function --\
>|---> co process -->
> process --> sink
> source 2 --/
>
> A field was added to output case class and the job would no longer start
> up from a save point.  I assumed this was a result of a serializer
> incompatibility.  I verified this by reversing the addition of the field
> and the job could then restore from the previous savepoint.  So far it
> makes sense.
>
> Then I decided to leave the new field in the case class, but eliminated
> most of the DAG, leaving only the source 1 --> async function portion of
> it.  The case class is emitted by the co process.  So this removed the
> modified case class from the processing graph.  When I try to restore from
> the savepoint, even if Allow Non Restored State is selected, the job fails
> to restore with the error "Deserialization of serializer erroed".
>
> So then I decided to completely eliminate the modified case class.  I
> removed all trace of it from the job, again only leaving the source 1 ->
> async function.  I tried to restore this job, with no traces of the case
> class, and still the job failed with the "Deserialization of serializer
> erroed" even when Allow Non Restored State is selected.
>
> Anyone seen anything like this?
>
> This is the error being generated:
>
> WARN
>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
> Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
> at
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
> at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> at java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
> at java.io.ObjectInputStream.readSerialData(Unknown Source)
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> at java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.io.ObjectInputStream.readObject(Unknown Source)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
> ... 14 more
> Caused by: java.lang.ClassNotFoundException:
> com.somewhere.TestJob$$anon$13$$anon$3
> at java.net.URLClassLoader.findClass(Unknown Source)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> at 

Re: Streaming to Parquet Files in HDFS

2018-10-01 Thread Biswajit Das
Nice to see this finally!

On Mon, Oct 1, 2018 at 1:53 AM Fabian Hueske  wrote:

> Hi Bill,
>
> Flink 1.6.0 supports writing Avro records as Parquet files to HDFS via the
> previously mentioned StreamingFileSink [1], [2].
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-9753
> [2] https://issues.apache.org/jira/browse/FLINK-9750
>
> Am Fr., 28. Sep. 2018 um 23:36 Uhr schrieb hao gao :
>
>> Hi Bill,
>>
>> I wrote those two medium posts you mentioned above. But clearly, the
>> techlab one is much better
>> I would suggest just "close the file when checkpointing" which is the
>> easiest way. If you use BucketingSink, you can modify the code to make it
>> work. Just replace the code from line 691 to 693 with
>> closeCurrentPartFile()
>>
>> https://github.com/apache/flink/blob/release-1.3.2-rc1/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L691
>> This should guarantee exactly-once. You may have some files with
>> underscore prefix when flink job failed. But usually those files are
>> ignored by the query engine/ readers for example,  Presto
>>
>> If you use 1.6 and later, I think the issue is already addressed
>> https://issues.apache.org/jira/browse/FLINK-9750
>>
>> Thanks
>> Hao
>>
>> On Fri, Sep 28, 2018 at 1:57 PM William Speirs 
>> wrote:
>>
>>> I'm trying to stream log messages (syslog fed into Kafak) into Parquet
>>> files on HDFS via Flink. I'm able to read, parse, and construct objects for
>>> my messages in Flink; however, writing to Parquet is tripping me up. I do
>>> *not* need to have this be real-time; a delay of a few minutes, even up to
>>> an hour, is fine.
>>>
>>> I've found the following articles talking about this being very
>>> difficult:
>>> *
>>> https://medium.com/hadoop-noob/a-realtime-flink-parquet-data-warehouse-df8c3bd7401
>>> * https://medium.com/hadoop-noob/flink-parquet-writer-d127f745b519
>>> *
>>> https://techlab.bol.com/how-not-to-sink-a-data-stream-to-files-journeys-from-kafka-to-parquet/
>>> *
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Rolling-sink-parquet-Avro-output-td11123.html
>>>
>>> All of these posts speak of troubles using the check-pointing mechanisms
>>> and Parquets need to perform batch writes. I'm not experienced enough with
>>> Flink's check-pointing or Parquet's file format to completely understand
>>> the issue. So my questions are as follows:
>>>
>>> 1) Is this possible in Flink in an exactly-once way? If not, is it
>>> possible in a way that _might_ cause duplicates during an error?
>>>
>>> 2) Is there another/better format to use other than Parquet that offers
>>> compression and the ability to be queried by something like Drill or Impala?
>>>
>>> 3) Any further recommendations for solving the overall problem:
>>> ingesting syslogs and writing them to a file(s) that is searchable by an
>>> SQL(-like) framework?
>>>
>>> Thanks!
>>>
>>> Bill-
>>>
>>
>>
>> --
>> Thanks
>>  - Hao
>>
>


Re: flink:latest container on kubernetes fails to connect taskmanager to jobmanager

2018-10-01 Thread jwatte
It turns out that the latest flink:latest docker image is 5 days old, and
thus bug was fixed 4 days ago in the flink-docker github.

The problem is that the docker-entrypoint.sh script chains to jobmanager.sh
by saying "start-foreground cluster" where the "cluster" argument is
obsolete as of Flink 1.5.

I patched it with a sed command in the Kubernetes manifest, until the
updated docker image makes it way to the world.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


flink:latest container on kubernetes fails to connect taskmanager to jobmanager

2018-10-01 Thread jwatte
I'm using the standard Kubernetes deploy configs for jobmanager and
taskmanager deployments, and jobmanager service.
However, when the task managers start up, they try to register with the job
manager over Akka on port 6123.
This fails, because the Akka on the jobmanager discards those messages as
"non-local."

The taskmanager keeps repeating this log message and eventually existing
(and getting restarted by Kubernetes):

2018-10-01 20:08:28,365 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
resolve ResourceManager address
akka.tcp://flink@flink-jobmanager:6123/user/resourcemanager, retrying in
1 ms: Ask timed out on
[ActorSelection[Anchor(akka.tcp://flink@flink-jobmanager:6123/),
Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message of
type "akka.actor.Identify"..

The jobmanager responds with this log message:

2018-10-01 20:09:38,475 ERROR akka.remote.EndpointWriter
   
- dropping message [class akka.actor.ActorSelectionMessage] for non-local
recipient [Actor[akka.tcp://flink@flink-jobmanager:6123/]] arriving at
[akka.tcp://flink@flink-jobmanager:6123] inbound addresses are
[akka.tcp://flink@cluster:6123]

I have verified that network connectivity exists, so this is some
configuration problem.
I notice that the docker-entrypoint.sh edits the config files and calls the
taskmanager.sh / jobmanager.sh scripts based on start mode.
Is this file editing the config file wrong? What needs to be done so that
Akka on the jobmanager accepts the registration messages?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: error with flink

2018-10-01 Thread yuvraj singh
i am using 1.6.0



On Mon, Oct 1, 2018 at 8:05 PM Hequn Cheng  wrote:

> Hi yuvraj,
>
> It seems a null key has been keyed by. Which Flink version do you use? And
> could you show some user code related about keyBy or GroupBy?
>
> Best, Hequn
>
> On Mon, Oct 1, 2018 at 9:26 PM yuvraj singh <19yuvrajsing...@gmail.com>
> wrote:
>
>> Hi i am facing this problem with my flink job please help me with it .
>>
>>
>>
>>
>> java.lang.Exception: An async function call terminated with an exception.
>> Failing the AsyncWaitOperator.
>>
>> at
>> org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)
>>
>> at
>> org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.lang.RuntimeException
>>
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>>
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>
>> at
>> org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:133)
>>
>> ... 2 more
>>
>> Caused by: java.lang.NullPointerException
>>
>> at
>> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
>>
>> at
>> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
>>
>> at
>> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
>>
>> at
>> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>>
>> at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
>>
>> at
>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>>
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>>
>> ... 8 more
>>
>>
>>
>>


Re: error with flink

2018-10-01 Thread Hequn Cheng
Hi yuvraj,

It seems a null key has been keyed by. Which Flink version do you use? And
could you show some user code related about keyBy or GroupBy?

Best, Hequn

On Mon, Oct 1, 2018 at 9:26 PM yuvraj singh <19yuvrajsing...@gmail.com>
wrote:

> Hi i am facing this problem with my flink job please help me with it .
>
>
>
>
> java.lang.Exception: An async function call terminated with an exception.
> Failing the AsyncWaitOperator.
>
> at
> org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)
>
> at
> org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.RuntimeException
>
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>
> at
> org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:133)
>
> ... 2 more
>
> Caused by: java.lang.NullPointerException
>
> at
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
>
> at
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
>
> at
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
>
> at
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
>
> at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>
> ... 8 more
>
>
>
>


Re: Does Flink SQL "in" operation has length limit?

2018-10-01 Thread Fabian Hueske
Hi,

I had a look into the code. From what I saw, we are translating the values
into Rows.
The problem here is that the IN clause is translated into a join and that
the join results contains a time attribute field. This is a safety
restriction to ensure that time attributes do not lose their watermark
alignment because joins can return their results in random order. This
should be related to or same as [1].

Anyway, we should not translate IN clauses to joins for incrementally
evaluated queries (aka. streaming queries).
The main problem here is that the join materializes both inputs which is
fine for the VALUES input but not for the "stream".
I created FLINK-10474 to fix the problem.

A workaround for the problem could be a user-defined scalar function that
replaces the IN clause.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-10211
[2] https://issues.apache.org/jira/browse/FLINK-10474

Am Mo., 1. Okt. 2018 um 10:01 Uhr schrieb Timo Walther :

> Hi,
>
> tuple should not be used anywhere in flink-table. @Rong can you point us
> to the corresponding code? I haven't looked into the code but we should
> definitely support this query. @Henry feel free to open an issue for it.
>
> Regards,
> Timo
>
>
> Am 28.09.18 um 19:14 schrieb Rong Rong:
>
> Yes.
>
> Thanks for bringing this up Hequn! :-) I think Tuple would not be the best
> container to use.
>
> However, in search for alternative, shouldn't Collection / List be a more
> suitable solution? Row seems to not fit in the context (as there can be
> Rows with elements of different type).
> I vaguely recall there was similar JIRA but might not be related to IN
> clause. Let me try to dig it up.
>
> --
> Rong
>
> On Fri, Sep 28, 2018 at 9:32 AM Hequn Cheng  wrote:
>
>> Hi,
>>
>> I haven't look into the code. If this is limited by Tuple, would it
>> better to implement it with Row?
>>
>> Best, Hequn
>>
>> On Fri, Sep 28, 2018 at 9:27 PM Rong Rong  wrote:
>>
>>> Hi Henry, Vino.
>>>
>>> I think IN operator was translated into either a RexSubQuery or a
>>> SqlStdOperatorTable.IN operator.
>>> I think Vino was referring to the first case.
>>> For the second case (I think that's what you are facing here), they are
>>> converted into tuples and the maximum we currently have in Flink was
>>> Tuple25.java, I was wondering if that was the issue you are facing. You
>>> can probably split the IN into many IN combining with OR.
>>>
>>> --
>>> Rong
>>>
>>> On Fri, Sep 28, 2018 at 2:33 AM vino yang  wrote:
>>>
 Hi Henry,

 Maybe the number of elements in your IN clause is out of range? Its
 default value is 20, you can modify it with this configuration item:

 *withInSubQueryThreshold(XXX)*

 This API comes from Calcite.

 Thanks, vino.

 徐涛  于2018年9月28日周五 下午4:23写道:

> Hi,
>
> When I am executing the following SQL in flink 1.6.1, some error 
> throws out saying that it has a support issue, but when I reduce the 
> number of integers in the “in” sentence, for example,
>
> trackId in (124427150,71648998) , Flink does not complain anything, 
> so I wonder is there any length limit in “in” operation?
>
> Thanks a lot.
>
> SELECT
> trackId as id,track_title as description, count(*) as cnt
> FROM
> play
> WHERE
> appName='play.statistics.trace' and
> trackId in 
> (124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
> GROUP BY
> HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' 
> MINUTE),trackId,track_title;
>
>
>
> FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
>   FlinkLogicalCalc(expr#0..3=[{inputs}], started_at_ts=[$t2],
> trackId=[$t0], track_title=[$t1])
> FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
>   FlinkLogicalCalc(expr#0..4=[{inputs}],
> expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)],
> trackId=[$t1], track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
> FlinkLogicalNativeTableScan(table=[[play]])
>   FlinkLogicalValues(tuples=[[{ 124427150 }, { 71648998 }, {
> 124493327 }, { 524043 }, { 27300837 }, { 30300481 }, { 27300809 }, {
> 124744768 }, { 45982512 }, { 124526566 }, { 124556427 }, { 124804208 }, {
> 74302264 }, { 119588973 }, { 30496269 }, { 27300288 }, { 124098818 }, {
> 125071530 }, { 120918746 }, { 124171456 }, { 30413034 }, { 124888075 }, {
> 125270551 }, { 125434224 }, { 27300195 }, { 45982342 }, { 45982468 }, {
> 45982355 }, 

Re: About the retract of the calculation result of flink sql

2018-10-01 Thread Hequn Cheng
Hi clay,

Keyed group by:

> SELECT a, SUM(b) as d
> FROM Orders
> GROUP BY a


Non Keyed group by:

> SELECT SUM(b) as d
> FROM Orders


I would like to look into the problem. However, I can't find obvious
problems from the sql. It would be great that can provide a minimal example
to reproduce the issue. Also, use print sink to avoid sinking into multi
kafka partitions, since it will also bring out of order problem.

Best, Hequn

On Mon, Oct 1, 2018 at 9:11 PM clay  wrote:

> hi,Timo
>
> I use env.setParallelism(1) in my code, I set the overall degree of
> parallelism of the program to 1, so that some calculations will still be
> parallelized?
>
> clay,
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: About the retract of the calculation result of flink sql

2018-10-01 Thread Fabian Hueske
Hi Clay,

If you do env.setParallelism(1), the query won't be executed in parallel.
However, looking at your screenshot the message order does not seem to be
the problem here (given that you printed the content of the topic).

Are you sure that it is not possible that the result decreases if some rows
are added to one of the input tables?
I don't have time to dig into your query, but the HAVING clause or the left
join and (u.id is null) predicate look a bit suspicious to me.

Would it be possible to create a minimal example that reproduces the issue?

Best, Fabian

Am Mo., 1. Okt. 2018 um 15:11 Uhr schrieb clay :

> hi,Timo
>
> I use env.setParallelism(1) in my code, I set the overall degree of
> parallelism of the program to 1, so that some calculations will still be
> parallelized?
>
> clay,
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


error with flink

2018-10-01 Thread yuvraj singh
Hi i am facing this problem with my flink job please help me with it .




java.lang.Exception: An async function call terminated with an exception.
Failing the AsyncWaitOperator.

at
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)

at
org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.RuntimeException

at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)

at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)

at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:133)

... 2 more

Caused by: java.lang.NullPointerException

at
org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)

at
org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)

at
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)

at
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)

at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)

at
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)

at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)

... 8 more


Re: About the retract of the calculation result of flink sql

2018-10-01 Thread clay4444
hi,Timo

I use env.setParallelism(1) in my code, I set the overall degree of
parallelism of the program to 1, so that some calculations will still be
parallelized?

clay,




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: About the retract of the calculation result of flink sql

2018-10-01 Thread clay4444
hi,Hequn

I don't understand you about the group by and non-keyed group by. Can you
explain it in a little more detail, or give me an example, thank u .

clay,




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Scheduler Customization

2018-10-01 Thread Hequn Cheng
Hi Ananth,

> if it detects significant backlog, skip over and consume from the latest
offset, and schedule a separate backfill task for the backlogged 20 minutes?

Fabian is right, there is no built-in operators for this.
If you don't care about Watermark, I think we can implement it with a
custom source which can sleep or consume data within a time range.

The job looks like:
Source1(s1) ->
 Union -> Sink
Source2(s2) ->

The job works as follows:
- t1: s1 working, s2 sleep
- t2: There is an outage of Elastic Search cluster
- t3: ES is available. s1 resume from t1 and end with t3. s2 start from t3
directly.
- t4: s1 sleep, s2 working

To achieve this, we should also find a way to exchange progresses between
the two sources. For example, sync source status with a Hbase or a Mysql
Table.

Best, Hequn


On Mon, Oct 1, 2018 at 5:17 PM Fabian Hueske  wrote:

> Hi Ananth,
>
> You can certainly do this with Flink, but there are no built-in operators
> for this.
> What you probably want to do is to compare the timestamp of the event with
> the current processing time and drop the record if it is too old.
> If the timestamp is encoded in the record, you can do this in
> FilterFunction or a FlatMapFunction. If the timestamp is attached as
> event-time timestamp, you can access it in a ProcessFunction.
>
> Best, Fabian
>
> Am Sa., 29. Sep. 2018 um 21:11 Uhr schrieb Ananth Durai <
> vanant...@gmail.com>:
>
>>
>> I'm writing a Flink connector to write a stream of events from Kafka to
>> Elastic Search. It is a typical metrics ingestion pipeline, where the
>> latest metrics preferred over the stale data.
>> What I mean by that, let's assume there was an outage of Elastic Search
>> cluster for about 20 minutes, all the metrics backlogged in Kafka during
>> that period. Once ES is available, the Flink stream will resume from the
>> last offset checkpoint (correctly so) and try to catch up. Instead is their
>> way we can customize flink stream to say if it detects significant backlog,
>> skip over and consume from the latest offset, and schedule a separate
>> backfill task for the backlogged 20 minutes?
>>
>> Regards,
>> Ananth.P,
>>
>>
>>
>>
>>
>>


Re: flink-1.6.1-bin-scala_2.11.tgz fails signture and hash verification

2018-10-01 Thread Till Rohrmann
This is unfortunately the realm of the ASF over which we don't have direct
control. We could think about filing an INFRA JIRA ticket to report this
problem (if it can be backtracked).

On Mon, Oct 1, 2018 at 2:42 PM Gianluca Ortelli <
gianl...@mediadistillery.com> wrote:

> Hi Till,
>
> I also believe that it's a problem with a single mirror. It was not a
> blocking problem for me; I just wanted you to be aware of it, in case you
> have some policy regarding the management of mirrors.
>
> Best,
> Gianluca
>
>
> On Mon, 1 Oct 2018 at 14:27, Till Rohrmann  wrote:
>
>> Hi Gianluca,
>>
>> I've downloaded flink-1.6.1-bin-scala_2.11.tgz from here [1] and verified
>> that the shasum512 and the signature are both correct.
>>
>> The only way I could explain this is that either your downloaded
>> artifacts or the mirror you got the binaries from got corrupted.
>>
>> [1] https://dist.apache.org/repos/dist/release/flink/flink-1.6.1/
>>
>> Cheers,
>> Till
>>
>> On Mon, Oct 1, 2018 at 12:07 PM Gianluca Ortelli <
>> gianl...@mediadistillery.com> wrote:
>>
>>> Hi Fabian,
>>>
>>> the mirror is
>>> https://www.apache.org/dyn/closer.lua/flink/flink-1.6.1/flink-1.6.1-bin-scala_2.11.tgz
>>>
>>> I just tried a download and the hash is still wrong: it should be
>>>
>>>
>>> d0153bad859e3c2da7e73299837f95670279b3102a0982809f56eb61875908a10120e82bc2dca59972e912c0221cbbfd01f4cd1128a92dd37358e28fe1f76f2f
>>>
>>> but instead it's
>>>
>>>
>>> 9b4ceb7ad59df27ea4c12d15845165dcf64675a26161d502d399234ae237f40f719ab6da4fac7ba210a20aa82364b5ac0377b06a1324e21516198b7a23b5d19c
>>>
>>> Best regards,
>>> Gianluca
>>>
>>>
>>> On Mon, 1 Oct 2018 at 11:59, Fabian Hueske  wrote:
>>>
 Hi Gianluca,

 I tried to validate the issue but hash and signature are OK for me.

 Do you remember which mirror you used to download the binaries?

 Best, Fabian


 Am Sa., 29. Sep. 2018 um 17:24 Uhr schrieb vino yang <
 yanghua1...@gmail.com>:

> Hi Gianluca,
>
> This is very strange, Till may be able to give an explanation, because
> it is the release manager of this version.
>
> Thanks, vino.
>
> Gianluca Ortelli  于2018年9月28日周五
> 下午4:02写道:
>
>> Hi,
>>
>> I just downloaded flink-1.6.1-bin-scala_2.11.tgz from
>> https://flink.apache.org/downloads.html and noticed that it fails
>> signature verification with a
>>
>> gpg: BAD signature from "Till Rohrmann (stsffap) <
>> trohrm...@apache.org>"
>>
>> message. The sha512 hash doesn't match either.
>>
>> I switched to 1.6.0, which verifies OK.
>>
>> Best regards,
>> Gianluca
>>
>


Re: flink-1.6.1-bin-scala_2.11.tgz fails signture and hash verification

2018-10-01 Thread Gianluca Ortelli
Hi Till,

I also believe that it's a problem with a single mirror. It was not a
blocking problem for me; I just wanted you to be aware of it, in case you
have some policy regarding the management of mirrors.

Best,
Gianluca


On Mon, 1 Oct 2018 at 14:27, Till Rohrmann  wrote:

> Hi Gianluca,
>
> I've downloaded flink-1.6.1-bin-scala_2.11.tgz from here [1] and verified
> that the shasum512 and the signature are both correct.
>
> The only way I could explain this is that either your downloaded artifacts
> or the mirror you got the binaries from got corrupted.
>
> [1] https://dist.apache.org/repos/dist/release/flink/flink-1.6.1/
>
> Cheers,
> Till
>
> On Mon, Oct 1, 2018 at 12:07 PM Gianluca Ortelli <
> gianl...@mediadistillery.com> wrote:
>
>> Hi Fabian,
>>
>> the mirror is
>> https://www.apache.org/dyn/closer.lua/flink/flink-1.6.1/flink-1.6.1-bin-scala_2.11.tgz
>>
>> I just tried a download and the hash is still wrong: it should be
>>
>>
>> d0153bad859e3c2da7e73299837f95670279b3102a0982809f56eb61875908a10120e82bc2dca59972e912c0221cbbfd01f4cd1128a92dd37358e28fe1f76f2f
>>
>> but instead it's
>>
>>
>> 9b4ceb7ad59df27ea4c12d15845165dcf64675a26161d502d399234ae237f40f719ab6da4fac7ba210a20aa82364b5ac0377b06a1324e21516198b7a23b5d19c
>>
>> Best regards,
>> Gianluca
>>
>>
>> On Mon, 1 Oct 2018 at 11:59, Fabian Hueske  wrote:
>>
>>> Hi Gianluca,
>>>
>>> I tried to validate the issue but hash and signature are OK for me.
>>>
>>> Do you remember which mirror you used to download the binaries?
>>>
>>> Best, Fabian
>>>
>>>
>>> Am Sa., 29. Sep. 2018 um 17:24 Uhr schrieb vino yang <
>>> yanghua1...@gmail.com>:
>>>
 Hi Gianluca,

 This is very strange, Till may be able to give an explanation, because
 it is the release manager of this version.

 Thanks, vino.

 Gianluca Ortelli  于2018年9月28日周五 下午4:02写道:

> Hi,
>
> I just downloaded flink-1.6.1-bin-scala_2.11.tgz from
> https://flink.apache.org/downloads.html and noticed that it fails
> signature verification with a
>
> gpg: BAD signature from "Till Rohrmann (stsffap)  >"
>
> message. The sha512 hash doesn't match either.
>
> I switched to 1.6.0, which verifies OK.
>
> Best regards,
> Gianluca
>



Re: flink-1.6.1-bin-scala_2.11.tgz fails signture and hash verification

2018-10-01 Thread Till Rohrmann
Hi Gianluca,

I've downloaded flink-1.6.1-bin-scala_2.11.tgz from here [1] and verified
that the shasum512 and the signature are both correct.

The only way I could explain this is that either your downloaded artifacts
or the mirror you got the binaries from got corrupted.

[1] https://dist.apache.org/repos/dist/release/flink/flink-1.6.1/

Cheers,
Till

On Mon, Oct 1, 2018 at 12:07 PM Gianluca Ortelli <
gianl...@mediadistillery.com> wrote:

> Hi Fabian,
>
> the mirror is
> https://www.apache.org/dyn/closer.lua/flink/flink-1.6.1/flink-1.6.1-bin-scala_2.11.tgz
>
> I just tried a download and the hash is still wrong: it should be
>
>
> d0153bad859e3c2da7e73299837f95670279b3102a0982809f56eb61875908a10120e82bc2dca59972e912c0221cbbfd01f4cd1128a92dd37358e28fe1f76f2f
>
> but instead it's
>
>
> 9b4ceb7ad59df27ea4c12d15845165dcf64675a26161d502d399234ae237f40f719ab6da4fac7ba210a20aa82364b5ac0377b06a1324e21516198b7a23b5d19c
>
> Best regards,
> Gianluca
>
>
> On Mon, 1 Oct 2018 at 11:59, Fabian Hueske  wrote:
>
>> Hi Gianluca,
>>
>> I tried to validate the issue but hash and signature are OK for me.
>>
>> Do you remember which mirror you used to download the binaries?
>>
>> Best, Fabian
>>
>>
>> Am Sa., 29. Sep. 2018 um 17:24 Uhr schrieb vino yang <
>> yanghua1...@gmail.com>:
>>
>>> Hi Gianluca,
>>>
>>> This is very strange, Till may be able to give an explanation, because
>>> it is the release manager of this version.
>>>
>>> Thanks, vino.
>>>
>>> Gianluca Ortelli  于2018年9月28日周五 下午4:02写道:
>>>
 Hi,

 I just downloaded flink-1.6.1-bin-scala_2.11.tgz from
 https://flink.apache.org/downloads.html and noticed that it fails
 signature verification with a

 gpg: BAD signature from "Till Rohrmann (stsffap) >>> >"

 message. The sha512 hash doesn't match either.

 I switched to 1.6.0, which verifies OK.

 Best regards,
 Gianluca

>>>


Re: Upgrade Flink with newer Java version

2018-10-01 Thread Chesnay Schepler

Please see https://issues.apache.org/jira/browse/FLINK-8033.

On 01.10.2018 13:39, Georgi Stoyanov wrote:


Hi,

Oracle will stop support for Java 8 on Jan 2019. Do you guys plans to 
upgrade the version?


If so, do you have ticket which we can watch for updates?

Regards,

G. Stoyanov





Upgrade Flink with newer Java version

2018-10-01 Thread Georgi Stoyanov
Hi,

Oracle will stop support for Java 8 on Jan 2019. Do you guys plans to upgrade 
the version?
If so, do you have ticket which we can watch for updates?

Regards,
G. Stoyanov


Re: In-Memory Lookup in Flink Operators

2018-10-01 Thread David Anderson
Hi Chirag,

The community is also looking at an approach that involves using
Bravo[1][2] to bootstrap state by loading the initial version of the state
into a savepoint.

[1] https://github.com/king/bravo
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Proposal-Utilities-for-reading-transforming-and-creating-Streaming-savepoints-td23843.html#a23854

On Mon, Oct 1, 2018 at 11:27 AM Fabian Hueske  wrote:

> Hi Chirag,
>
> Flink 1.5.0 added support for BroadcastState which should address your
> requirement of replicating the data.  [1]
> The replicated data is stored in the configured state backend which can
> also be RocksDB.
>
> Regarding the reload, I would recommend Lasse's approach of having a
> custom source that pushes data in regular intervals instead.
> One problem is that it is not possible to pause a stream until all data is
> loaded. Instread, you would need to buffer that data in state as well and
> work with start and end markers on the broadcast stream.
>
> Best, Fabian
>
> [1]
> https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink
>
>
> Am So., 30. Sep. 2018 um 10:48 Uhr schrieb Chirag Dewan <
> chirag.dewa...@yahoo.in>:
>
>> Thanks Lasse, that is rightly put. That's the only solution I can think
>> of too.
>>
>> Only thing which I can't get my head around is using the coMap and
>> coFlatMap functions with such a stream. Since they dont support side
>> outputs, is there a way my lookup map/flatmap function simply consume a
>> stream?
>>
>> Ken, thats an interesting solution actually. Is there any chance you need
>> to update the memory-loaded data too?
>>
>> Thanks,
>>
>> Chirag
>>
>> On Sunday, 30 September, 2018, 5:17:51 AM IST, Ken Krugler <
>> kkrugler_li...@transpac.com> wrote:
>>
>>
>> Hi Lasse,
>>
>> One approach I’ve used in a similar situation is to have a
>> “UnionedSource” wrapper that first emits the (bounded) data that will be
>> loaded in-memory, and then starts running the source that emits the
>> continuous stream of data.
>>
>> This outputs an Either, which I then split, and broadcast the A,
>> and key/partition the B.
>>
>> You could do something similar, but occasionally keep checking if there’s
>> more  data vs. assuming it’s bounded.
>>
>> The main issue I ran into is that it doesn’t seem possible to do
>> checkpointing, or at least I couldn’t think of a way to make this work
>> properly.
>>
>> — Ken
>>
>>
>> On Sep 27, 2018, at 9:50 PM, Lasse Nedergaard 
>> wrote:
>>
>> Hi.
>>
>> We have created our own database source that pools the data with a
>> configured interval. We then use a co processed function. It takes to input
>> one from our database and one from our data input. I require that you keyby
>> with the attributes you use lookup in your map function.
>> To delay your data input until your database lookup is done first time is
>> not simple but a simple solution could be to implement a delay operation or
>> keep the data in your process function until data arrive from your database
>> stream.
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>>
>> Den 28. sep. 2018 kl. 06.28 skrev Chirag Dewan :
>>
>> Hi,
>>
>> I saw Apache Flink User Mailing List archive. - static/dynamic lookups
>> in flink streaming
>> 
>>  being
>> discussed, and then I saw this FLIP
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>> .
>>
>> I know we havent made much progress on this topic. I still wanted to put
>> forward my problem statement around this.
>>
>> I am also looking for a dynamic lookup in Flink operators. I actually
>> want to pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc.
>> into memory. Along with that, I have to ensure a refresh of in-memory
>> lookup table periodically. The period being a configurable parameter.
>>
>> This is what a map operator would look like with lookup:
>>
>> -> Load in-memory lookup - Refresh timer start
>> -> Stream processing start
>> -> Call lookup
>> -> Use lookup result in Stream processing
>> -> Timer elapsed -> Reload lookup data source into in-memory table
>> -> Continue processing
>>
>>
>>  My concern around these are :
>>
>> 1) Possibly storing the same copy of data in every Task slots memory or
>> state backend(RocksDB in my case).
>> 2) Having a dedicated refresh thread for each subtask instance(possibly,
>> every Task Manager having multiple refresh thread)
>>
>> Am i thinking in the right direction? Or missing something very obvious?
>> It confusing.
>>
>> Any leads are much appreciated. Thanks in advance.
>>
>> Cheers,
>> Chirag
>>
>>
>> --
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>>
>>

-- 
*David Anderson* | Training Coordinator | data Artisans
--
Join 

Re: Superstep-like synchronization of streaming iteration

2018-10-01 Thread Paris Carbone
Hi Christian,

It is great to see use iterative use cases, thanks for sharing your problem!

Superstep iterative BSP synchronization for streams is a problem we have been 
looking into extensively, however this functionality is still not standardised 
yet on Flink.
I think your use case is fully covered by our proposed approach, described in a 
research talk at Flink Forward 18 in Berlin [1] (probably there is a video 
available too at the dataArtisans website). 
Take a look and in case this approach satisfies your needs and you would like 
to test out your application with our current prototype please do PM me!

Paris

[1] 
https://www.slideshare.net/FlinkForward/flink-forward-berlin-2018-paris-carbone-stream-loops-on-flink-reinventing-the-wheel-for-the-streaming-era

> On 29 Sep 2018, at 20:51, Christian Lehner  
> wrote:
> 
> Hi all,
> 
> 
> if you don't want to read the wall of text below, in short, I want to know if 
> it is possible to get a superstep-based iteration on a possibly unbounded 
> DataStream in Flink in an efficient way and what general concept(s) of 
> synchronization you would suggest for that.
> 
> 
> I would like to write a program that has different vertices (realized just as 
> Longs for now) in a graph which all store a keyed state and communicate with 
> each other with messages that arrive in an iterated stream.
> 
> From the outside I would only get the messages to add (or, possibly in the 
> future, delete, however that can be ignored for now) a certain vertex with 
> some possible additional information specified by the program (this message 
> can be assumed to have the same form as any other message) and then the rest 
> would happen through an iterated stream keyed by the vertex to which the 
> message is adressed in which a vertex through a KeyedProcessFunction (or 
> KeyedBroadcastProcessFunction if a BroadcastStream is used for 
> synchronization) can send new messages to any other vertex (or itself) based 
> on the received message(s) and its own current state and can also update its 
> state based on the received message(s). The new messages would then be fed 
> back into the iterated stream. If no synchronization is done this works quite 
> well, however it doesn't produce helpful results for my problem since no 
> order in which the messages arrive can be guaranteed.
> 
> What I would optimally like to have is a pregel-like superstep-based 
> iteration which runs on a batch of outside messages (here: vertex additions) 
> until no more messages are produced and after that repeats that with the next 
> batch of vertices either infinitely or until there are no more new messages 
> received. During the execution of each batch all vertices (including older 
> ones) can be activated again by receiving a message and the state of each 
> vertex should be preserved throughout the execution of the program. The 
> problem lies in how I can seperate the messages into supersteps in an 
> iterative partitioned stream similar to the iterations in the DataSetAPI.
> 
> One idea I had was just making tumbling windows of a large enough amount of 
> time which would just collect all the messages and then emit them in a 
> ProcessWindowFunction once the window fires. While this would be quite a 
> simple solution that requires little non-parallel synchonization and it would 
> obviously require that we know such a time in which we can be guaranteed that 
> all messages have been processed and all new messages for the next superstep 
> produced which is realistically not the case. It would also mean that in most 
> supersteps the program would wait longer than necessary until it starts the 
> next superstep. Fault tolerance would also be very hard to achieve.
> 
> Another more complex idea was to just globally synchronize with an object 
> that remembers which vertices have been sent messages in the previous 
> superstep by being informed before any message is sent and then is also 
> informed when a vertex is done with processing a message and informs the 
> vertex if there globally are no more messages to be processed. If that is the 
> case the vertex then sends a NextSuperstep message which is broadcast to all 
> partitions with a BroadcastStream. After that all vertices can start with 
> processing all messages sent to them in the previous superstep. Other than 
> not being trivially to synchronize without any problems (which I'm working on 
> myself) this approach has the obvious disadvantage that a lot of information 
> has to be passed to this object in a globally synchronized manner which kind 
> of kills the point of parallel processing. Although it is obvious that some 
> global synchronization probably has to take place this approach seems rather 
> ineffective to me.
> 
> Since I haven't been working with flink for very long, although I have 
> intensively used it for the past couple of weeks and read all releveant 
> documentation I could find, I would like to ask if 

Re: flink-1.6.1-bin-scala_2.11.tgz fails signture and hash verification

2018-10-01 Thread Gianluca Ortelli
Hi Fabian,

the mirror is
https://www.apache.org/dyn/closer.lua/flink/flink-1.6.1/flink-1.6.1-bin-scala_2.11.tgz

I just tried a download and the hash is still wrong: it should be

d0153bad859e3c2da7e73299837f95670279b3102a0982809f56eb61875908a10120e82bc2dca59972e912c0221cbbfd01f4cd1128a92dd37358e28fe1f76f2f

but instead it's

9b4ceb7ad59df27ea4c12d15845165dcf64675a26161d502d399234ae237f40f719ab6da4fac7ba210a20aa82364b5ac0377b06a1324e21516198b7a23b5d19c

Best regards,
Gianluca


On Mon, 1 Oct 2018 at 11:59, Fabian Hueske  wrote:

> Hi Gianluca,
>
> I tried to validate the issue but hash and signature are OK for me.
>
> Do you remember which mirror you used to download the binaries?
>
> Best, Fabian
>
>
> Am Sa., 29. Sep. 2018 um 17:24 Uhr schrieb vino yang <
> yanghua1...@gmail.com>:
>
>> Hi Gianluca,
>>
>> This is very strange, Till may be able to give an explanation, because it
>> is the release manager of this version.
>>
>> Thanks, vino.
>>
>> Gianluca Ortelli  于2018年9月28日周五 下午4:02写道:
>>
>>> Hi,
>>>
>>> I just downloaded flink-1.6.1-bin-scala_2.11.tgz from
>>> https://flink.apache.org/downloads.html and noticed that it fails
>>> signature verification with a
>>>
>>> gpg: BAD signature from "Till Rohrmann (stsffap) "
>>>
>>> message. The sha512 hash doesn't match either.
>>>
>>> I switched to 1.6.0, which verifies OK.
>>>
>>> Best regards,
>>> Gianluca
>>>
>>


Re: flink-1.6.1-bin-scala_2.11.tgz fails signture and hash verification

2018-10-01 Thread Fabian Hueske
Hi Gianluca,

I tried to validate the issue but hash and signature are OK for me.

Do you remember which mirror you used to download the binaries?

Best, Fabian


Am Sa., 29. Sep. 2018 um 17:24 Uhr schrieb vino yang :

> Hi Gianluca,
>
> This is very strange, Till may be able to give an explanation, because it
> is the release manager of this version.
>
> Thanks, vino.
>
> Gianluca Ortelli  于2018年9月28日周五 下午4:02写道:
>
>> Hi,
>>
>> I just downloaded flink-1.6.1-bin-scala_2.11.tgz from
>> https://flink.apache.org/downloads.html and noticed that it fails
>> signature verification with a
>>
>> gpg: BAD signature from "Till Rohrmann (stsffap) "
>>
>> message. The sha512 hash doesn't match either.
>>
>> I switched to 1.6.0, which verifies OK.
>>
>> Best regards,
>> Gianluca
>>
>


Re: I want run flink program in ubuntu x64 Mult Node Cluster what is configuration?

2018-10-01 Thread Fabian Hueske
Hi,

these issues are not related to Flink but rather generic Linux / bash
issues.
Ensure that the start scripts are executable (can be changed with chmod)
your user has the right permissions to executed the start scripts.
Also, you have to use the right path to the scripts. If you are in the base
folder of the Flink setup, the scripts are located in the bin folder, i.e.,
you can run them with ./bin/start-cluster.sh

Best, Fabian

Am So., 30. Sep. 2018 um 13:02 Uhr schrieb Mar_zieh <
m.marzieh.ghas...@gmail.com>:

> I run "sudo /start-cluster.sh" but I got this error:
>
> sudo: /start-cluster.sh : command not found.
>
> Would you please tell me for just one node, is it necessary to configure
> it?
>
> Thank you
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Regarding implementation of aggregate function using a ProcessFunction

2018-10-01 Thread Fabian Hueske
Hi,

There are basically three options:
1) Use an AggregateFunction and store everything that you would put into
state into the Accumulator. This can become quite expensive because the
Accumulator is de/serialized for every function call if you use RocksDB.
The advantage is that you don't have to store all records in state but only
the data you need. Simple aggregations like COUNT or SUM are quite cheap.
2) Use Flink's window primitiives and a WindowProcessFunction. In this
case, all records of a Window are stored in a ListState. Adding a record to
the LIst is cheap, but the state might grow quite large for longer windows.
When the window is evaluated, all records are loaded into memory and
iterated by the WindowProcessFunction.
3) Implement the windowing logic in a ProcessFunction. This requires a lot
of additional logic, depending on what types of windows you want to support.

Flink's SQL / Table API implements the first approach.

Best, Fabian

Am So., 30. Sep. 2018 um 12:48 Uhr schrieb Gaurav Luthra <
gauravluthra6...@gmail.com>:

> Hi ken,
>
> Mine is very generic use case. Means I am building an aggregation function
> using flink, which can be configured according to any use case.
> Actually, It will not be for a specific use case and every user can enter
> their business logic and use this aggregator to get result.
> And about windowing also, user can configure the type of window and my
> aggregator will ask about the required properties for that window.
>
> I hope you got some idea.
>
> But for make it generic I need to use processfunction and process() method
> to implement it. Instead of more specific AggregateFunction and aggregate()
> method.
>
> So, I am looking for inputs if anyone has tried implementing aggregation
> using ProcessFunction and process() function. As it very much needed thing
> with flink.
>
> Thanks and Regards,
> Gaurav Luthra
> Mob:- +91-9901945206
>
>
> On Sun, Sep 30, 2018 at 5:12 AM Ken Krugler 
> wrote:
>
>> Hi Gaurav,
>>
>> I’m curious - for your use case, what are the windowing & aggregation
>> requirements?
>>
>> E.g. is it a 10 second sliding window?
>>
>> And what’s the aggregation you’re trying to do?
>>
>> Thanks,
>>
>> — Ken
>>
>>
>> On Sep 28, 2018, at 4:00 AM, Gaurav Luthra 
>> wrote:
>>
>> Hi Chesnay,
>>
>> I know it is an issue, And won't be fixed because of window merging
>> feature in case of session window.
>> But I am looking if someone has implemented aggregation function using
>> ProcessFunction and process() method instead of AggregationFunction and
>> aggregate() method.
>> I hope you got my point.
>>
>> Thanks & Regards
>> Gaurav Luthra
>>
>>
>>
>> On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler 
>> wrote:
>>
>>> Please see: https://issues.apache.org/jira/browse/FLINK-10250
>>>
>>> On 28.09.2018 11:27, vino yang wrote:
>>>
>>> Hi Gaurav,
>>>
>>> Yes, you are right. It is really not allowed to use RichFunction. I will
>>> Ping Timo, he may give you a more professional answer.
>>>
>>> Thanks, vino.
>>>
>>> Gaurav Luthra  于2018年9月28日周五 下午4:27写道:
>>>
 Hi Vino,

 Kindly check below flink code.

 package org.apache.flink.streaming.api.datastream.WindowedStream

 @PublicEvolving
 public  SingleOutputStreamOperator
 aggregate(AggregateFunction function) {
 checkNotNull(function, "function");

 if (*function instanceof RichFunction*) {
 throw new *UnsupportedOperationException("This aggregation function
 cannot be a RichFunction.")*;
 }

 TypeInformation accumulatorType =
 TypeExtractor.getAggregateFunctionAccumulatorType(
 function, input.getType(), null, false);

 TypeInformation resultType =
 TypeExtractor.getAggregateFunctionReturnType(
 function, input.getType(), null, false);

 return aggregate(function, accumulatorType, resultType);
 }


 Kindly, check above snapshot of flink;s aggregate() method, that got
 applied on windowed stream.

 Thanks & Regards
 Gaurav Luthra
 Mob:- +91-9901945206


 On Fri, Sep 28, 2018 at 1:40 PM vino yang 
 wrote:

> Hi Gaurav,
>
> This is very strange, can you share your code and specific exceptions?
> Under normal circumstances, it should not throw an exception.
>
> Thanks, vino.
>
> Gaurav Luthra  于2018年9月28日周五 下午3:27写道:
>
>> Hi Vino,
>>
>> RichAggregateFunction can surely access the state. But the problem
>> is, In aggregate() method we can not use RichAggregateFunction.
>> If we use then it throws exception.
>>
>> So, the option is to use AggregateFunction (not Rich) with
>> aggregate() method on windowed stream. Now, In AggregateFunction, we 
>> cannot
>> access RuntimeContext. Hence we can not use state.
>>
>> Thanks & Regards
>> Gaurav
>>
>>
>>
>> On Fri, 28 Sep, 2018, 12:40 PM vino yang, 
>> wrote:
>>
>>> Hi Gaurav,
>>>
>>> Why do you 

Re: In-Memory Lookup in Flink Operators

2018-10-01 Thread Fabian Hueske
Hi Chirag,

Flink 1.5.0 added support for BroadcastState which should address your
requirement of replicating the data.  [1]
The replicated data is stored in the configured state backend which can
also be RocksDB.

Regarding the reload, I would recommend Lasse's approach of having a custom
source that pushes data in regular intervals instead.
One problem is that it is not possible to pause a stream until all data is
loaded. Instread, you would need to buffer that data in state as well and
work with start and end markers on the broadcast stream.

Best, Fabian

[1]
https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink


Am So., 30. Sep. 2018 um 10:48 Uhr schrieb Chirag Dewan <
chirag.dewa...@yahoo.in>:

> Thanks Lasse, that is rightly put. That's the only solution I can think of
> too.
>
> Only thing which I can't get my head around is using the coMap and
> coFlatMap functions with such a stream. Since they dont support side
> outputs, is there a way my lookup map/flatmap function simply consume a
> stream?
>
> Ken, thats an interesting solution actually. Is there any chance you need
> to update the memory-loaded data too?
>
> Thanks,
>
> Chirag
>
> On Sunday, 30 September, 2018, 5:17:51 AM IST, Ken Krugler <
> kkrugler_li...@transpac.com> wrote:
>
>
> Hi Lasse,
>
> One approach I’ve used in a similar situation is to have a “UnionedSource”
> wrapper that first emits the (bounded) data that will be loaded in-memory,
> and then starts running the source that emits the continuous stream of data.
>
> This outputs an Either, which I then split, and broadcast the A, and
> key/partition the B.
>
> You could do something similar, but occasionally keep checking if there’s
> more  data vs. assuming it’s bounded.
>
> The main issue I ran into is that it doesn’t seem possible to do
> checkpointing, or at least I couldn’t think of a way to make this work
> properly.
>
> — Ken
>
>
> On Sep 27, 2018, at 9:50 PM, Lasse Nedergaard 
> wrote:
>
> Hi.
>
> We have created our own database source that pools the data with a
> configured interval. We then use a co processed function. It takes to input
> one from our database and one from our data input. I require that you keyby
> with the attributes you use lookup in your map function.
> To delay your data input until your database lookup is done first time is
> not simple but a simple solution could be to implement a delay operation or
> keep the data in your process function until data arrive from your database
> stream.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 28. sep. 2018 kl. 06.28 skrev Chirag Dewan :
>
> Hi,
>
> I saw Apache Flink User Mailing List archive. - static/dynamic lookups in
> flink streaming
> 
>  being
> discussed, and then I saw this FLIP
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
> .
>
> I know we havent made much progress on this topic. I still wanted to put
> forward my problem statement around this.
>
> I am also looking for a dynamic lookup in Flink operators. I actually want
> to pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc. into
> memory. Along with that, I have to ensure a refresh of in-memory lookup
> table periodically. The period being a configurable parameter.
>
> This is what a map operator would look like with lookup:
>
> -> Load in-memory lookup - Refresh timer start
> -> Stream processing start
> -> Call lookup
> -> Use lookup result in Stream processing
> -> Timer elapsed -> Reload lookup data source into in-memory table
> -> Continue processing
>
>
>  My concern around these are :
>
> 1) Possibly storing the same copy of data in every Task slots memory or
> state backend(RocksDB in my case).
> 2) Having a dedicated refresh thread for each subtask instance(possibly,
> every Task Manager having multiple refresh thread)
>
> Am i thinking in the right direction? Or missing something very obvious?
> It confusing.
>
> Any leads are much appreciated. Thanks in advance.
>
> Cheers,
> Chirag
>
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>


Re: Flink Scheduler Customization

2018-10-01 Thread Fabian Hueske
Hi Ananth,

You can certainly do this with Flink, but there are no built-in operators
for this.
What you probably want to do is to compare the timestamp of the event with
the current processing time and drop the record if it is too old.
If the timestamp is encoded in the record, you can do this in
FilterFunction or a FlatMapFunction. If the timestamp is attached as
event-time timestamp, you can access it in a ProcessFunction.

Best, Fabian

Am Sa., 29. Sep. 2018 um 21:11 Uhr schrieb Ananth Durai :

>
> I'm writing a Flink connector to write a stream of events from Kafka to
> Elastic Search. It is a typical metrics ingestion pipeline, where the
> latest metrics preferred over the stale data.
> What I mean by that, let's assume there was an outage of Elastic Search
> cluster for about 20 minutes, all the metrics backlogged in Kafka during
> that period. Once ES is available, the Flink stream will resume from the
> last offset checkpoint (correctly so) and try to catch up. Instead is their
> way we can customize flink stream to say if it detects significant backlog,
> skip over and consume from the latest offset, and schedule a separate
> backfill task for the backlogged 20 minutes?
>
> Regards,
> Ananth.P,
>
>
>
>
>
>


Re: Data loss when restoring from savepoint

2018-10-01 Thread Juho Autio
Hi Andrey,

To rule out for good any questions about sink behaviour, the job was killed
and started with an additional Kafka sink.

The same number of ids were missed in both outputs: KafkaSink &
BucketingSink.

I wonder what would be the next steps in debugging?

On Fri, Sep 21, 2018 at 3:49 PM Juho Autio  wrote:

> Thanks, Andrey.
>
> > so it means that the savepoint does not loose at least some dropped
> records.
>
> I'm not sure what you mean by that? I mean, it was known from the
> beginning, that not everything is lost before/after restoring a savepoint,
> just some records around the time of restoration. It's not 100% clear
> whether records are lost before making a savepoint or after restoring it.
> Although, based on the new DEBUG logs it seems more like losing some
> records that are seen ~soon after restoring. It seems like Flink would be
> somehow confused either about the restored state vs. new inserts to state.
> This could also be somehow linked to the high back pressure on the kafka
> source while the stream is catching up.
>
> > If it is feasible for your setup, I suggest to insert one more map
> function after reduce and before sink.
> > etc.
>
> Isn't that the same thing that we discussed before? Nothing is sent to
> BucketingSink before the window closes, so I don't see how it would make
> any difference if we replace the BucketingSink with a map function or
> another sink type. We don't create or restore savepoints during the time
> when BucketingSink gets input or has open buckets – that happens at a much
> later time of day. I would focus on figuring out why the records are lost
> while the window is open. But I don't know how to do that. Would you have
> any additional suggestions?
>
> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin 
> wrote:
>
>> Hi Juho,
>>
>> so it means that the savepoint does not loose at least some dropped
>> records.
>>
>> If it is feasible for your setup, I suggest to insert one more map
>> function after reduce and before sink.
>> The map function should be called right after window is triggered but
>> before flushing to s3.
>> The result of reduce (deduped record) could be logged there.
>> This should allow to check whether the processed distinct records were
>> buffered in the state after the restoration from the savepoint or not. If
>> they were buffered we should see that there was an attempt to write them to
>> the sink from the state.
>>
>> Another suggestion is to try to write records to some other sink or to
>> both.
>> E.g. if you can access file system of workers, maybe just into local
>> files and check whether the records are also dropped there.
>>
>> Best,
>> Andrey
>>
>> On 20 Sep 2018, at 15:37, Juho Autio  wrote:
>>
>> Hi Andrey!
>>
>> I was finally able to gather the DEBUG logs that you suggested. In short,
>> the reducer logged that it processed at least some of the ids that were
>> missing from the output.
>>
>> "At least some", because I didn't have the job running with DEBUG logs
>> for the full 24-hour window period. So I was only able to look up if I can
>> find *some* of the missing ids in the DEBUG logs. Which I did indeed.
>>
>> I changed the DistinctFunction.java to do this:
>>
>> @Override
>> public Map reduce(Map value1,
>> Map value2) {
>> LOG.debug("DistinctFunction.reduce returns: {}={}",
>> value1.get("field"), value1.get("id"));
>> return value1;
>> }
>>
>> Then:
>>
>> vi flink-1.6.0/conf/log4j.properties
>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>
>> Then I ran the following kind of test:
>>
>> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC
>> 2018
>> - Started a new cluster & job with DEBUG enabled at ~09:13, restored from
>> that previous cluster's savepoint
>> - Ran until caught up offsets
>> - Cancelled the job with a new savepoint
>> - Started a new job _without_ DEBUG, which restored the new savepoint,
>> let it keep running so that it will eventually write the output
>>
>> Then on the next day, after results had been flushed when the 24-hour
>> window closed, I compared the results again with a batch version's output.
>> And found some missing ids as usual.
>>
>> I drilled down to one specific missing id (I'm replacing the actual value
>> with AN12345 below), which was not found in the stream output, but was
>> found in batch output & flink DEBUG logs.
>>
>> Related to that id, I gathered the following information:
>>
>> 2018-09-18~09:13:21,000 job started & savepoint is restored
>>
>> 2018-09-18 09:14:29,085 missing id is processed for the first time,
>> proved by this log line:
>> 2018-09-18 09:14:29,085 DEBUG
>> com.rovio.ds.flink.uniqueid.DistinctFunction  -
>> DistinctFunction.reduce returns: s.aid1=AN12345
>>
>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
>>
>> 

Re: [DISCUSS] Dropping flink-storm?

2018-10-01 Thread Fabian Hueske
+1 to drop it.

Thanks, Fabian

Am Sa., 29. Sep. 2018 um 12:05 Uhr schrieb Niels Basjes :

>  I would drop it.
>
> Niels Basjes
>
> On Sat, 29 Sep 2018, 10:38 Kostas Kloudas, 
> wrote:
>
> > +1 to drop it as nobody seems to be willing to maintain it and it also
> > stands in the way for future developments in Flink.
> >
> > Cheers,
> > Kostas
> >
> > > On Sep 29, 2018, at 8:19 AM, Tzu-Li Chen  wrote:
> > >
> > > +1 to drop it.
> > >
> > > It seems few people use it. Commits history of an experimental
> > > module sparse often means that there is low interest.
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > 远远  于2018年9月29日周六 下午2:16写道:
> > >
> > >> +1, it‘s time to drop it
> > >>
> > >> Zhijiang(wangzhijiang999)  于2018年9月29日周六
> > >> 下午1:53写道:
> > >>
> > >>> Very agree with to drop it. +1
> > >>>
> > >>> --
> > >>> 发件人:Jeff Carter 
> > >>> 发送时间:2018年9月29日(星期六) 10:18
> > >>> 收件人:dev 
> > >>> 抄 送:chesnay ; Till Rohrmann <
> trohrm...@apache.org
> > >;
> > >>> user 
> > >>> 主 题:Re: [DISCUSS] Dropping flink-storm?
> > >>>
> > >>> +1 to drop it.
> > >>>
> > >>> On Fri, Sep 28, 2018, 7:25 PM Hequn Cheng 
> > wrote:
> > >>>
> >  Hi,
> > 
> >  +1 to drop it. It seems that few people use it.
> > 
> >  Best, Hequn
> > 
> >  On Fri, Sep 28, 2018 at 10:22 PM Chesnay Schepler <
> ches...@apache.org
> > >
> >  wrote:
> > 
> > > I'm very much in favor of dropping it.
> > >
> > > Flink has been continually growing in terms of features, and IMO
> > we've
> > > reached the point where we should cull some of the more obscure
> ones.
> > >>>
> > > flink-storm, while interesting from a theoretical standpoint,
> offers
> > too
> > > little value.
> > >
> > >>>
> > > Note that the bolt/spout wrapper parts of the part are still
> > compatible,
> > > it's only topologies that aren't working.
> > >
> > > IMO compatibility layers only add value if they ease the migration
> to
> > > Flink APIs.
> > >>>
> > > * bolt/spout wrappers do this, but they will continue to work even
> > if we
> > > drop it
> > > * topologies don't do this, so I'm not interested in then.
> > >
> > > On 28.09.2018 15:22, Till Rohrmann wrote:
> > >> Hi everyone,
> > >>
> > >> I would like to discuss how to proceed with Flink's storm
> > >> compatibility layer flink-strom.
> > >>
> > >> While working on removing Flink's legacy mode, I noticed that some
> > >>>
> > >> parts of flink-storm rely on the legacy Flink client. In fact, at
> > the
> > >>>
> > >> moment flink-storm does not work together with Flink's new
> > distributed
> > >> architecture.
> > >>
> > >> I'm also wondering how many people are actually using Flink's
> Storm
> > >> compatibility layer and whether it would be worth porting it.
> > >>
> > >> I see two options how to proceed:
> > >>
> > >> 1) Commit to maintain flink-storm and port it to Flink's new
> >  architecture
> > >> 2) Drop flink-storm
> > >>
> > >>>
> > >> I doubt that we can contribute it to Apache Bahir [1], because
> once
> > we
> > >>>
> > >> remove the legacy mode, this module will no longer work with all
> > newer
> > >> Flink versions.
> > >>
> > >>>
> > >> Therefore, I would like to hear your opinion on this and in
> > particular
> > >> if you are using or planning to use flink-storm in the future.
> > >>
> > >> [1] https://github.com/apache/bahir-flink
> > >>
> > >> Cheers,
> > >> Till
> > >
> > >
> > >
> > 
> > >>>
> > >>>
> > >>>
> >
> >
>


Re: Streaming to Parquet Files in HDFS

2018-10-01 Thread Fabian Hueske
Hi Bill,

Flink 1.6.0 supports writing Avro records as Parquet files to HDFS via the
previously mentioned StreamingFileSink [1], [2].

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9753
[2] https://issues.apache.org/jira/browse/FLINK-9750

Am Fr., 28. Sep. 2018 um 23:36 Uhr schrieb hao gao :

> Hi Bill,
>
> I wrote those two medium posts you mentioned above. But clearly, the
> techlab one is much better
> I would suggest just "close the file when checkpointing" which is the
> easiest way. If you use BucketingSink, you can modify the code to make it
> work. Just replace the code from line 691 to 693 with
> closeCurrentPartFile()
>
> https://github.com/apache/flink/blob/release-1.3.2-rc1/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L691
> This should guarantee exactly-once. You may have some files with
> underscore prefix when flink job failed. But usually those files are
> ignored by the query engine/ readers for example,  Presto
>
> If you use 1.6 and later, I think the issue is already addressed
> https://issues.apache.org/jira/browse/FLINK-9750
>
> Thanks
> Hao
>
> On Fri, Sep 28, 2018 at 1:57 PM William Speirs  wrote:
>
>> I'm trying to stream log messages (syslog fed into Kafak) into Parquet
>> files on HDFS via Flink. I'm able to read, parse, and construct objects for
>> my messages in Flink; however, writing to Parquet is tripping me up. I do
>> *not* need to have this be real-time; a delay of a few minutes, even up to
>> an hour, is fine.
>>
>> I've found the following articles talking about this being very difficult:
>> *
>> https://medium.com/hadoop-noob/a-realtime-flink-parquet-data-warehouse-df8c3bd7401
>> * https://medium.com/hadoop-noob/flink-parquet-writer-d127f745b519
>> *
>> https://techlab.bol.com/how-not-to-sink-a-data-stream-to-files-journeys-from-kafka-to-parquet/
>> *
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Rolling-sink-parquet-Avro-output-td11123.html
>>
>> All of these posts speak of troubles using the check-pointing mechanisms
>> and Parquets need to perform batch writes. I'm not experienced enough with
>> Flink's check-pointing or Parquet's file format to completely understand
>> the issue. So my questions are as follows:
>>
>> 1) Is this possible in Flink in an exactly-once way? If not, is it
>> possible in a way that _might_ cause duplicates during an error?
>>
>> 2) Is there another/better format to use other than Parquet that offers
>> compression and the ability to be queried by something like Drill or Impala?
>>
>> 3) Any further recommendations for solving the overall problem: ingesting
>> syslogs and writing them to a file(s) that is searchable by an SQL(-like)
>> framework?
>>
>> Thanks!
>>
>> Bill-
>>
>
>
> --
> Thanks
>  - Hao
>


Re: Can't get the FlinkKinesisProducer to work against Kinesalite for tests

2018-10-01 Thread Fabian Hueske
Hi Bruno,

Thanks for sharing your approach!

Best, Fabian

Am Do., 27. Sep. 2018 um 18:11 Uhr schrieb Bruno Aranda :

> Hi again,
>
> We managed at the end to get data into Kinesalite using the
> FlinkKinesisProducer, but to do so, we had to use different configuration,
> such as ignoring the 'aws.endpoint' setting and going for the ones that the
> Kinesis configuration will expect. So, to our FlinkKinesisProducer we pass
> configuration such as:
>
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> producerConfig.put("KinesisEndpoint", 
> "localhost")producerConfig.put("KinesisPort", 
> "4567")producerConfig.put("VerifyCertificate", "false")
>
> We had to make sure that Kinesalite itself was being started with the
> `--ssl` parameter, in order to use TLS and available thought HTTPS.
>
> And, very importantly as well, our tests use Docker to run and we have
> find out just before throwing the towel that for this you can not use an
> Alpine-based image. If you want the Amazon KPL to work fine, it will need
> to be one of the Debian images running in Docker.
>
> Hope this saves someone all the days we have spent looking at it :)
>
> Cheers,
>
> Bruno
>
> On Wed, 26 Sep 2018 at 14:59 Bruno Aranda  wrote:
>
>> Hi,
>>
>> We have started to use Kinesis with Flink and we need to be able to test
>> when a Flink jobs writes to Kinesis. For that, we use a docker image with
>> Kinesalite.
>>
>> To configure the producer, we do like it is explained in the docs [1].
>>
>> However, if we use this code, the job submission is going to fail,
>> because the Flink Kinesis connector expect the configuration to have either
>> the endpoint or the region, but not both, or none. (there is a typo in the
>> error message as well where 'aws.region' is metioned twice) [2].
>>
>> However, if we only specify the endpoint, then the KPL will fail
>> complaining that there is no Region configured. It does look like Kinesis
>> may not be trying to set up the endpoint? We are confused.
>>
>> On the other hand, the Flink consumer works as expected and the endpoint
>> pointing to Kinesalite works fine. The consumer follows a different path
>> and creates the AWS client through a call to AWSUtil [3], which will take
>> the endpoint into account.
>>
>> Are we missing something? We have tried this in Flink versions from 1.3.2
>> to 1.6.1, building the kinesis connector against the latests KPLs.
>>
>> Any help is appreciated,
>>
>> Thanks!
>>
>> Bruno
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#using-non-aws-kinesis-endpoints-for-testing
>> [2]
>> https://github.com/apache/flink/blob/a36d1999f743d00c4fac8fdf61ad85a4b5d5f3bc/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java#L272
>> [3]
>> https://github.com/apache/flink/blob/a36d1999f743d00c4fac8fdf61ad85a4b5d5f3bc/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L66
>>
>>


Re: Does Flink SQL "in" operation has length limit?

2018-10-01 Thread Timo Walther

Hi,

tuple should not be used anywhere in flink-table. @Rong can you point us 
to the corresponding code? I haven't looked into the code but we should 
definitely support this query. @Henry feel free to open an issue for it.


Regards,
Timo


Am 28.09.18 um 19:14 schrieb Rong Rong:

Yes.

Thanks for bringing this up Hequn! :-) I think Tuple would not be the 
best container to use.


However, in search for alternative, shouldn't Collection / List be a 
more suitable solution? Row seems to not fit in the context (as there 
can be Rows with elements of different type).
I vaguely recall there was similar JIRA but might not be related to IN 
clause. Let me try to dig it up.


--
Rong

On Fri, Sep 28, 2018 at 9:32 AM Hequn Cheng > wrote:


Hi,

I haven't look into the code. If this is limited by Tuple, would
it better to implement it with Row?

Best, Hequn

On Fri, Sep 28, 2018 at 9:27 PM Rong Rong mailto:walter...@gmail.com>> wrote:

Hi Henry, Vino.

I think IN operator was translated into either a RexSubQuery
or a SqlStdOperatorTable.IN operator.
I think Vino was referring to the first case.
For the second case (I think that's what you are facing here),
they are converted into tuples and the maximum we currently
have in Flink was Tuple25.java, I was wondering if that was
the issue you are facing. You can probably split the IN into
many IN combining with OR.

--
Rong

On Fri, Sep 28, 2018 at 2:33 AM vino yang
mailto:yanghua1...@gmail.com>> wrote:

Hi Henry,

Maybe the number of elements in your IN clause is out of
range? Its default value is 20, you can modify it with
this configuration item:

/*withInSubQueryThreshold(XXX)*/

This API comes from Calcite.

Thanks, vino.

徐涛 mailto:happydexu...@gmail.com>> 于2018年9月28日周五
下午4:23写道:

Hi,

 When I am executing the following SQL in flink 1.6.1, some 
error throws out saying that it has a support issue, but when I reduce the 
number of integers in the “in” sentence, for example,

trackId in (124427150,71648998) , Flink does not
complain anything, so I wonder is there any length
limit in “in”operation?

Thanks a lot.

SELECT
 trackId as id,track_title as description, count(*) as cnt
FROM
 play
WHERE
 appName='play.statistics.trace' and
 trackId in 
(124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
GROUP BY
 HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' 
MINUTE),trackId,track_title;



FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
FlinkLogicalCalc(expr#0..3=[{inputs}],
started_at_ts=[$t2], trackId=[$t0], track_title=[$t1])
    FlinkLogicalJoin(condition=[=($0, $3)],
joinType=[inner])
FlinkLogicalCalc(expr#0..4=[{inputs}],
expr#5=[_UTF-16LE'play.statistics.trace'],
expr#6=[=($t0, $t5)], trackId=[$t1],
track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
FlinkLogicalNativeTableScan(table=[[play]])
      FlinkLogicalValues(tuples=[[{ 124427150 }, {
71648998 }, { 124493327 }, { 524043 }, { 27300837 }, {
30300481 }, { 27300809 }, { 124744768 }, { 45982512 },
{ 124526566 }, { 124556427 }, { 124804208 }, {
74302264 }, { 119588973 }, { 30496269 }, { 27300288 },
{ 124098818 }, { 125071530 }, { 120918746 }, {
124171456 }, { 30413034 }, { 124888075 }, { 125270551
}, { 125434224 }, { 27300195 }, { 45982342 }, {
45982468 }, { 45982355 }, { 65349883 }, { 124705962 },
{ 65349905 }, { 124298305 }, { 124889583 }, { 45982338
}, { 20506255 }, { 18556415 }, { 122161128 }, {
27299018 }, { 122850375 }, { 124862362 }, { 45982336
}, { 59613202 }, { 122991190 }, { 124590280 }, {
124867563 }, { 45982332 }, { 124515944 }, { 20506257
}, { 122572115 }, { 92083574 }]])

This exception indicates that the query uses an

Re: About the retract of the calculation result of flink sql

2018-10-01 Thread Timo Walther

Hi,

you also need to keep the parallelism in mind. If your downstream 
operator or sink has a parallelism of 1 and your SQL query pipeline has 
a higher parallelism, the retract results are rebalanced and arrive in a 
wrong order. For example, if you view the changelog in SQL Client, the 
built-in SQL Client sink has always parallelism 1.


Regards,
Timo



Am 29.09.18 um 17:02 schrieb Hequn Cheng:

Hi clay,

Are there any other lines after the last line in your picture? The 
final result should be eventual consistency and correct.


In your sql, there is a left join, a keyed group by and a non-keyed 
group by. Both of the left join and keyed group by will send 
retractions to the downstream non-keyed group by once there is an 
update. The retraction messages vibrate the result value. However, the 
final result will be correct.
To get monotonous results, you can add another non-keyed group by with 
max.


Best, Hequn.


On Sat, Sep 29, 2018 at 3:47 PM clay > wrote:


My final calculation result is implemented in the following way
when writing
to kafka, because KafkaTableSink does not support retract mode, I
am not
sure whether this method will affect the calculation result.

val userTest: Table = tEnv.sqlQuery(sql)

val endStream = tEnv.toRetractStream[Row](userTest)

//userTest.insertInto("kafkaSink")

val myProducer = new FlinkKafkaProducer011[String](
  kafkaBrokers,         // broker list
  topic,   // target topic
  new SimpleStringSchema)   // serialization schema

endStream.map(x=>{
  s"${x._1}:${x._2.toString}"
}).addSink(myProducer)



--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/