Re: Yahoo Streaming Benchmark on a Flink 1.5 cluster

2018-08-10 Thread vino yang
Hi Namu,

I don't think you need to pay attention to the internals of the Flink API.
Its interface is backward compatible. If you update the dependent version
of the API and the corresponding version of the Flink system so that their
versions are consistent, there should be no problems.
Please ensure that the version of Flink is consistent with the version of
the API you are using.

Thanks, vino.

Naum Gjorgjeski  于2018年8月11日周六 上午4:25写道:

> Hi all,
>
>
> I still cannot resolve the problem. Can I please get any advice for it?
> Thank you.
>
>
> Best regards,
>
> Naum Gjorgjeski
>
> --
> *From:* Naum Gjorgjeski 
> *Sent:* Wednesday, August 1, 2018 1:05 AM
> *To:* user@flink.apache.org
> *Subject:* Yahoo Streaming Benchmark on a Flink 1.5 cluster
>
>
> Hi,
>
> I am trying to run the data Artisans version  of the Yahoo Streaming
> Benchmark. The benchmark applications are  written for Flink 1.0.1.
> However, I need them to run on a Flink 1.5  cluster. When I try to build
> the benchmark applications with any version  of Flink from 1.3.0 or higher,
> I get many compile errors. The compile  errors state that some of the
> classes and methods cannot be found  (because part of the Flink API has
> changed in recent versions).
>
> The classes that cannot be found are:
> org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner
> org.apache.flink.api.common.state.OperatorState
> org.apache.flink.api.common.state.StateBackend
> org.apache.flink.runtime.state.AsynchronousStateHandle
> org.apache.flink.runtime.state.StateHandle
> org.apache.flink.streaming.runtime.tasks.StreamTaskState
>
> org.apache.flink.runtime.state.AbstractStateBackend.CheckpointStateOutputView
>
> The methods that cannot be found are:
>
> org.apache.flink.streaming.api.operators.StreamOperator.snapshotOperatorState(long,long)
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStateBackend()
>
> org.apache.flink.runtime.state.AbstractStateBackend.createCheckpointStateOutputView(long,long)
>
> I was able to find a substitution for few of them (e.g. using
> FlinkFixedPartitioner  instead of FlinkPartitioner), but for most of them
> there are no  straightforward substitutions. Could you please give me an
> advice on how  to resolve this problem? Thank you in advance.
>
> Best regards,
> Naum
>
>


Re: flink requires table key when insert into upsert table sink

2018-08-10 Thread Hequn Cheng
Hi,

*> Could you give an example that the query has a unique key?*

Consider the following sql:

SELECT a, SUM(b) as d
> FROM Orders
> GROUP BY a


The result table contains unique key of a.  A document about Streaming
Concepts[1] may be helpful for you.

*> What is the mechanism flink infer which field is the unique key(s)?*

Currently(flink-1.6.0), flink sql generate unique keys only from group by
and the unique keys info can be passed to the downstream operators, for
example the SELECT.

*Implement a RetractStreamTableSink*

Since outer joins output update data without unique keys, you can use a
RetractTableSink to output data. There are some documents about implement a
table sink[2].

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#streaming-concepts
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink


On Sat, Aug 11, 2018 at 6:02 AM, 徐涛  wrote:

> Hi Fabian,
> Could you give an example that the query has a unique key?
> What is the mechanism flink infer which field is the unique key(s)?
> Thanks a lot!
>
> Best, Henry
>
>
> 在 2018年8月11日,上午5:21,Fabian Hueske  写道:
>
> Hi Henry,
>
> The problem is that the table that results from the query does not have a
> unique key.
> You can only use an upsert sink if the table has a (composite) unique key.
> Since this is not the case, you cannot use upsert sink.
> However, you can implement a StreamRetractionTableSink which allows to
> write any kind of Table (append-only/update, keyed/non-keyed) to an
> external system.
>
> Best, Fabian
>
> 2018-08-10 17:06 GMT+02:00 徐涛 :
>
>> Hi All,
>> I am using flink 1.6 to generate some realtime programs. I want to write
>> the output to table sink, the code is as below. At first I use append table
>> sink, which error message tells me that I should use upsert table sink, so
>> I write one. But still another error “Caused by:
>> org.apache.flink.table.api.TableException: UpsertStreamTableSink
>> requires that Table has a full primary keys if it is updated.” comes
>> out,which blocks me. My questions is how to modify a table keys in this
>> scenario? I also check the exception stack, and found that the system infer
>> the keys field by
>> val tableKeys: Option[Array[String]] = 
>> UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan),
>> I wonder how to make the function return value ?
>> Thanks a lot !!!
>>
>> var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM 
>> praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
>> DAY),article_id" )
>> tableEnv.registerTable("praiseAggr", praise)
>>
>> var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU 
>> FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
>> DAY),article_id" )
>> tableEnv.registerTable("commentAggr", comment)
>>
>> var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM 
>> reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
>> DAY),article_id" )
>> tableEnv.registerTable("readerAggr", reader)
>>
>> var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " 
>> +  " SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN 
>> commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on 
>> c.article_id=r.article_id")
>>
>>
>>
>>
>> Thank,
>> Henry Xu
>>
>
>
>


Re: flink requires table key when insert into upsert table sink

2018-08-10 Thread 徐涛
Hi Fabian,
Could you give an example that the query has a unique key?
What is the mechanism flink infer which field is the unique key(s)?
Thanks a lot!

Best, Henry

> 在 2018年8月11日,上午5:21,Fabian Hueske  写道:
> 
> Hi Henry,
> 
> The problem is that the table that results from the query does not have a 
> unique key. 
> You can only use an upsert sink if the table has a (composite) unique key. 
> Since this is not the case, you cannot use upsert sink.
> However, you can implement a StreamRetractionTableSink which allows to write 
> any kind of Table (append-only/update, keyed/non-keyed) to an external system.
> 
> Best, Fabian
> 
> 2018-08-10 17:06 GMT+02:00 徐涛  >:
> Hi All,
>   I am using flink 1.6 to generate some realtime programs. I want to 
> write the output to table sink, the code is as below. At first I use append 
> table sink, which error message tells me that I should use upsert table sink, 
> so I write one. But still another error “Caused by: 
> org.apache.flink.table.api.TableException: UpsertStreamTableSink requires 
> that Table has a full primary keys if it is updated.” comes out,which blocks 
> me. My questions is how to modify a table keys in this scenario? I also check 
> the exception stack, and found that the system infer the keys field by 
> val tableKeys: Option[Array[String]] = 
> UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan), I wonder how to make 
> the function return value ?
> Thanks a lot !!!
> var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM 
> praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
> DAY),article_id" )
> tableEnv.registerTable("praiseAggr", praise)
> 
> var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU 
> FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
> DAY),article_id" )
> tableEnv.registerTable("commentAggr", comment)
> 
> var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM 
> reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
> DAY),article_id" )
> tableEnv.registerTable("readerAggr", reader)
> 
> var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " 
> +  " SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN 
> commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on 
> c.article_id=r.article_id")
> 
>   
> 
>   
> 
> Thank,
> Henry Xu
> 



Re: flink requires table key when insert into upsert table sink

2018-08-10 Thread Fabian Hueske
Hi Henry,

The problem is that the table that results from the query does not have a
unique key.
You can only use an upsert sink if the table has a (composite) unique key.
Since this is not the case, you cannot use upsert sink.
However, you can implement a StreamRetractionTableSink which allows to
write any kind of Table (append-only/update, keyed/non-keyed) to an
external system.

Best, Fabian

2018-08-10 17:06 GMT+02:00 徐涛 :

> Hi All,
> I am using flink 1.6 to generate some realtime programs. I want to write
> the output to table sink, the code is as below. At first I use append table
> sink, which error message tells me that I should use upsert table sink, so
> I write one. But still another error “Caused by: 
> org.apache.flink.table.api.TableException:
> UpsertStreamTableSink requires that Table has a full primary keys if it is
> updated.” comes out,which blocks me. My questions is how to modify a table
> keys in this scenario? I also check the exception stack, and found that the
> system infer the keys field by
> val tableKeys: Option[Array[String]] = UpdatingPlanChecker.
> getUniqueKeyFields(optimizedPlan), I wonder how to make the function
> return value ?
> Thanks a lot !!!
>
> var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM 
> praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
> DAY),article_id" )
> tableEnv.registerTable("praiseAggr", praise)
>
> var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU 
> FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
> DAY),article_id" )
> tableEnv.registerTable("commentAggr", comment)
>
> var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM 
> reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
> DAY),article_id" )
> tableEnv.registerTable("readerAggr", reader)
>
> var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " 
> +  " SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN 
> commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on 
> c.article_id=r.article_id")
>
>
>
>
> Thank,
> Henry Xu
>


Re: Dataset.distinct - Question on deterministic results

2018-08-10 Thread Will Bastian
Fabian,
Thanks for the clear response. You addressed my question, and the
suggestions provide clear context on how to address.

Best,
Will


On Fri, Aug 10, 2018 at 5:52 AM Fabian Hueske  wrote:

> Hi Will,
>
> The distinct operator is implemented as a groupBy(distinctKeys) and a
> ReduceFunction that returns the first argument.
> Hence, it depends on the order in which the records are processed by the
> ReduceFunction.
>
> Flink does not maintain a deterministic order because it is quite
> expensive in distributed systems.
> There are a few aspects that result in random order:
> - lazy split assignment
> - combiners (which are automatically added for ReduceFunctions)
> - network shuffles
>
> There are two ways to address this issue:
> 1) Fully sort the input of the combiners and reducers on all attributes.
> 2) Use a custom ReduceFunction that compares both input records on all
> (non-distinct-key) fields to determine which record to return.
>
> I would go for the second approach because it is more efficient (no need
> to fully sort before the combiner).
>
> Best, Fabian
>
> 2018-08-09 18:12 GMT+02:00 Will Bastian :
>
>> I'm operating on a data set with some challenges to overcome. They are:
>>
>>1. There is possibility for multiple entries for a single key
>>and
>>2. For a single key, there may be multiple unique value-tuples
>>
>> For example
>> key, val1, val2, val3
>> 1,  0,0,0
>> 1,  0,0,0
>> 1,  1,0,0
>> 2,  1,1,1
>> 2,  1,1,1
>> 2,  1,1,0
>> 1,  0,0,0
>>
>> I've found when executing mySet.distinct(_.key) on the above, that my
>> final results suggest distinct isn't always pulling the same
>> record/value-tuple on every run.
>>
>> Fully understanding that the use of distinct I've outlined above isn't
>> optimal (we don't know, or care which value-tuple we get, we just want it
>> to be consistent on each run), I wanted to validate whether what I believe
>> I'm observing is accurate. Specifically, in this example is Flink reducing
>> by key with no concern for value, and we can expect the possibility that we
>> may pull different instances back on each distinct call?
>>
>> Thanks,
>> Will
>>
>
>


Re: Yahoo Streaming Benchmark on a Flink 1.5 cluster

2018-08-10 Thread Naum Gjorgjeski
Hi all,


I still cannot resolve the problem. Can I please get any advice for it? Thank 
you.


Best regards,

Naum Gjorgjeski


From: Naum Gjorgjeski 
Sent: Wednesday, August 1, 2018 1:05 AM
To: user@flink.apache.org
Subject: Yahoo Streaming Benchmark on a Flink 1.5 cluster


Hi,

I am trying to run the data Artisans version  of the Yahoo Streaming Benchmark. 
The benchmark applications are  written for Flink 1.0.1. However, I need them 
to run on a Flink 1.5  cluster. When I try to build the benchmark applications 
with any version  of Flink from 1.3.0 or higher, I get many compile errors. The 
compile  errors state that some of the classes and methods cannot be found  
(because part of the Flink API has changed in recent versions).

The classes that cannot be found are:
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner
org.apache.flink.api.common.state.OperatorState
org.apache.flink.api.common.state.StateBackend
org.apache.flink.runtime.state.AsynchronousStateHandle
org.apache.flink.runtime.state.StateHandle
org.apache.flink.streaming.runtime.tasks.StreamTaskState
org.apache.flink.runtime.state.AbstractStateBackend.CheckpointStateOutputView

The methods that cannot be found are:
org.apache.flink.streaming.api.operators.StreamOperator.snapshotOperatorState(long,long)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStateBackend()
org.apache.flink.runtime.state.AbstractStateBackend.createCheckpointStateOutputView(long,long)

I was able to find a substitution for few of them (e.g. using 
FlinkFixedPartitioner  instead of FlinkPartitioner), but for most of them there 
are no  straightforward substitutions. Could you please give me an advice on 
how  to resolve this problem? Thank you in advance.

Best regards,
Naum



flink requires table key when insert into upsert table sink

2018-08-10 Thread 徐涛
Hi All,
I am using flink 1.6 to generate some realtime programs. I want to 
write the output to table sink, the code is as below. At first I use append 
table sink, which error message tells me that I should use upsert table sink, 
so I write one. But still another error “Caused by: 
org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that 
Table has a full primary keys if it is updated.” comes out,which blocks me. My 
questions is how to modify a table keys in this scenario? I also check the 
exception stack, and found that the system infer the keys field by 
val tableKeys: Option[Array[String]] = 
UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan), I wonder how to make the 
function return value ?
Thanks a lot !!!
var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM 
praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
DAY),article_id" )
tableEnv.registerTable("praiseAggr", praise)

var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU 
FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
DAY),article_id" )
tableEnv.registerTable("commentAggr", comment)

var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM 
reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' 
DAY),article_id" )
tableEnv.registerTable("readerAggr", reader)

var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " +  
" SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN 
commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on 
c.article_id=r.article_id")





Thank,
Henry Xu

Re: JDBCInputFormat and SplitDataProperties

2018-08-10 Thread Alexis Sarda
It ended up being a wrong configuration of the cluster; there was only 1
task manager with 1 slot.

If I submit a job with "flink run -p 24 ...", will the job hang until at
least 24 slots are available?

Regards,
Alexis.

On Fri, 10 Aug 2018, 14:01 Fabian Hueske  wrote:

> Can you share the plan for the program?
>
> Are you sure that more than 1 split is generated by the JdbcInputFormat?
>
> 2018-08-10 12:04 GMT+02:00 Alexis Sarda :
>
>> It seems I may have spoken too soon. After executing the job with more
>> data, I can see the following things in the Flink dashboard:
>>
>> - The first subtask is a chained DataSource -> GroupCombine. Even with
>> parallelism set to 24 and a ParameterValuesProvider returning
>> Array(Array("first"), Array("second")), only 1 thread processed all records.
>> - The second subtask is a Sorted Group Reduce, and I see two weird things:
>>   + The first subtask sent 5,923,802 records, yet the second subtask only
>> received 5,575,154 records?
>>   + Again, everything was done in a single thread, even though a groupBy
>> was used.
>> - The third and final subtask is a sink that saves back to the database.
>>
>> Does anyone know why parallelism is not being used?
>>
>> Regards,
>> Alexis.
>>
>>
>> On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda 
>> wrote:
>>
>>> Hi Fabian,
>>>
>>> Thanks a lot for the help. The scala DataSet, at least in version 1.5.0,
>>> declares javaSet as private[flink], so I cannot access it directly.
>>> Nevertheless, I managed to get around it by using the java environment:
>>>
>>> val env = org.apache.flink.api.java.ExecutionEnvironment.
>>> getExecutionEnvironment
>>>
>>> val inputFormat = getInputFormat(query, dbUrl, properties)
>>> val outputFormat = getOutputFormat(dbUrl, properties)
>>>
>>> val source = env.createInput(inputFormat)
>>> val sdp = source.getSplitDataProperties
>>> sdp.splitsPartitionedBy(0)
>>> sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))
>>>
>>> // transform java DataSet to scala DataSet...
>>> new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
>>>   .groupBy(0, 1)
>>>   .combineGroup(groupCombiner)
>>>   .withForwardedFields("f0->_1")
>>>   .groupBy(0, 1)
>>>   .reduceGroup(groupReducer)
>>>   .withForwardedFields("_1")
>>>   .output(outputFormat)
>>>
>>> It seems to work well, and the semantic annotation does remove a hash
>>> partition from the execution plan.
>>>
>>> Regards,
>>> Alexis.
>>>
>>>
>>> On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske  wrote:
>>>
 Hi Alexis,

 The Scala API does not expose a DataSource object but only a Scala
 DataSet which wraps the Java object.
 You can get the SplitDataProperties from the Scala DataSet as follows:

 val dbData: DataSet[...] = ???
 val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties

 So you first have to get the wrapped Java DataSet, cast it to
 DataSource and then get the properties.
 It's not very nice, but should work.

 In order to use SDPs, you should be a bit familiar how physical data
 properties are propagated and discarded in the optimizer.
 For example, applying a simple MapFunction removes all properties
 because the function might have changed the fields on which a DataSet is
 partitioned or sorted.
 You can expose the behavior of a function to the optimizer by using
 Semantic Annotations [1]

 Some comments on the code and plan you shared:
 - You might want to add hostname to ORDER BY to have the output grouped
 by (ts, hostname).
 - Check the Global and Local data properties in the plan to validate
 that the SDP were correctly interpreted.
 - If the data is already correctly partitioned and sorted, you might
 not need the Combiners. In either case, you properly want to annotate them
 with Forward Field annoations.

 The number of source tasks is unrelated to the number of splits. If you
 have more tasks than splits, some tasks won't process any data.

 Best, Fabian

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations


 2018-08-08 14:10 GMT+02:00 Alexis Sarda :

> Hi Fabian,
>
> Thanks for the clarification. I have a few remarks, but let me provide
> more concrete information. You can find the query I'm using, the
> JDBCInputFormat creation, and the execution plan in this github gist:
>
> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>
> I cannot call getSplitDataProperties because
> env.createInput(inputFormat) returns a DataSet, not a DataSource. In the
> code, I do this instead:
>
> val javaEnv =
> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment
> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
> "example")
>
> which feels wrong (the constructor doesn't accept a Scala
> 

Re: Small-files source - partitioning based on prefix of file

2018-08-10 Thread Fabian Hueske
Hi Averell,

Conceptually, you are right. Checkpoints are taken at every operator at the
same "logical" time.
It is not important, that each operator checkpoints at the same wallclock
time. Instead, the need to take a checkpoint when they have processed the
same input.
This is implemented with so-called Checkpoint Barriers, which are special
records that are injected at the sources.
[Simplification] When an operator receives a barrier it performs a
checkpoint. [/Simplification]
This way, we do not need to pause the processing of all operators but can
perform the checkpoints locally for each operator.

This page of the Internal docs should help to understand how the mechanism
works in detail [1].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/stream_checkpointing.html

2018-08-10 14:43 GMT+02:00 Averell :

> Thank you Vino, Jorn, and Fabian.
> Please forgive me for my ignorant, as I am still not able to fully
> understand state/checkpointing and the statement that Fabian gave earlier:
> "/In either case, some record will be read twice but if reading position
> can
> be reset, you can still have exactly-once state consistency because the
> state is reset as well./"
>
> My current understanding is: checkpointing is managed at the
> Execution-Environment level, and it would happen at the same time at all
> the
> operators of the pipeline. Is this true?
> My concern here is how to manage that synchronization? It would be quite
> possible that at different operators, checkpointing happens at some
> milliseconds apart, which would lead to duplicated or missed records,
> wouldn't it?
>
> I tried to read Flink's document about managing State  here
>  stable/dev/stream/state/state.html>
> . However, I have not been able to find the information I am looking for.
> Please help point me to the right place.
>
> Thanks and best regards,
> Averell.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Small-files source - partitioning based on prefix of file

2018-08-10 Thread Averell
Thank you Vino, Jorn, and Fabian.
Please forgive me for my ignorant, as I am still not able to fully
understand state/checkpointing and the statement that Fabian gave earlier:
"/In either case, some record will be read twice but if reading position can
be reset, you can still have exactly-once state consistency because the
state is reset as well./"

My current understanding is: checkpointing is managed at the
Execution-Environment level, and it would happen at the same time at all the
operators of the pipeline. Is this true?
My concern here is how to manage that synchronization? It would be quite
possible that at different operators, checkpointing happens at some
milliseconds apart, which would lead to duplicated or missed records,
wouldn't it?

I tried to read Flink's document about managing State  here

 
. However, I have not been able to find the information I am looking for.
Please help point me to the right place.

Thanks and best regards,
Averell.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: JDBCInputFormat and SplitDataProperties

2018-08-10 Thread Fabian Hueske
Can you share the plan for the program?

Are you sure that more than 1 split is generated by the JdbcInputFormat?

2018-08-10 12:04 GMT+02:00 Alexis Sarda :

> It seems I may have spoken too soon. After executing the job with more
> data, I can see the following things in the Flink dashboard:
>
> - The first subtask is a chained DataSource -> GroupCombine. Even with
> parallelism set to 24 and a ParameterValuesProvider returning
> Array(Array("first"), Array("second")), only 1 thread processed all records.
> - The second subtask is a Sorted Group Reduce, and I see two weird things:
>   + The first subtask sent 5,923,802 records, yet the second subtask only
> received 5,575,154 records?
>   + Again, everything was done in a single thread, even though a groupBy
> was used.
> - The third and final subtask is a sink that saves back to the database.
>
> Does anyone know why parallelism is not being used?
>
> Regards,
> Alexis.
>
>
> On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda 
> wrote:
>
>> Hi Fabian,
>>
>> Thanks a lot for the help. The scala DataSet, at least in version 1.5.0,
>> declares javaSet as private[flink], so I cannot access it directly.
>> Nevertheless, I managed to get around it by using the java environment:
>>
>> val env = org.apache.flink.api.java.ExecutionEnvironment.getExecut
>> ionEnvironment
>>
>> val inputFormat = getInputFormat(query, dbUrl, properties)
>> val outputFormat = getOutputFormat(dbUrl, properties)
>>
>> val source = env.createInput(inputFormat)
>> val sdp = source.getSplitDataProperties
>> sdp.splitsPartitionedBy(0)
>> sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))
>>
>> // transform java DataSet to scala DataSet...
>> new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
>>   .groupBy(0, 1)
>>   .combineGroup(groupCombiner)
>>   .withForwardedFields("f0->_1")
>>   .groupBy(0, 1)
>>   .reduceGroup(groupReducer)
>>   .withForwardedFields("_1")
>>   .output(outputFormat)
>>
>> It seems to work well, and the semantic annotation does remove a hash
>> partition from the execution plan.
>>
>> Regards,
>> Alexis.
>>
>>
>> On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske  wrote:
>>
>>> Hi Alexis,
>>>
>>> The Scala API does not expose a DataSource object but only a Scala
>>> DataSet which wraps the Java object.
>>> You can get the SplitDataProperties from the Scala DataSet as follows:
>>>
>>> val dbData: DataSet[...] = ???
>>> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties
>>>
>>> So you first have to get the wrapped Java DataSet, cast it to DataSource
>>> and then get the properties.
>>> It's not very nice, but should work.
>>>
>>> In order to use SDPs, you should be a bit familiar how physical data
>>> properties are propagated and discarded in the optimizer.
>>> For example, applying a simple MapFunction removes all properties
>>> because the function might have changed the fields on which a DataSet is
>>> partitioned or sorted.
>>> You can expose the behavior of a function to the optimizer by using
>>> Semantic Annotations [1]
>>>
>>> Some comments on the code and plan you shared:
>>> - You might want to add hostname to ORDER BY to have the output grouped
>>> by (ts, hostname).
>>> - Check the Global and Local data properties in the plan to validate
>>> that the SDP were correctly interpreted.
>>> - If the data is already correctly partitioned and sorted, you might not
>>> need the Combiners. In either case, you properly want to annotate them with
>>> Forward Field annoations.
>>>
>>> The number of source tasks is unrelated to the number of splits. If you
>>> have more tasks than splits, some tasks won't process any data.
>>>
>>> Best, Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.5/dev/batch/#semantic-annotations
>>>
>>>
>>> 2018-08-08 14:10 GMT+02:00 Alexis Sarda :
>>>
 Hi Fabian,

 Thanks for the clarification. I have a few remarks, but let me provide
 more concrete information. You can find the query I'm using, the
 JDBCInputFormat creation, and the execution plan in this github gist:

 https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d

 I cannot call getSplitDataProperties because
 env.createInput(inputFormat) returns a DataSet, not a DataSource. In the
 code, I do this instead:

 val javaEnv = org.apache.flink.api.java.ExecutionEnvironment.
 getExecutionEnvironment
 val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
 "example")

 which feels wrong (the constructor doesn't accept a Scala environment).
 Is there a better alternative?

 I see absolutely no difference in the execution plan whether I use SDP
 or not, so therefore the results are indeed the same. Is this expected?

 My ParameterValuesProvider specifies 2 splits, yet the execution plan
 shows Parallelism=24. Even the source code is a bit ambiguous, considering
 that the constructor for 

Re: Standalone cluster instability

2018-08-10 Thread Piotr Nowojski
Hi,

Please post full TaskManager logs, including stderr and stdout. (Have you 
checked the stderr/stdout for some messages?)

I could think of couple reasons:
1. process segfault
2. process killed by OS
3. OS failure

1. Should be visible by some message in stderr/stdout file and can be caused by 
for example JVM, RocksDB or some other native library/code bug. 
2. Is your system maybe running out of memory? Kernel might kill process if 
that’s happening. You can also check system (linux?) logs for errors that 
correlate in time. Where are those logs depend on your OS. 
3. This might be tricky, but I have seen kernel failures that prevented any 
messages from being logged for example. Besides this TaskManager failure is 
your machine operating normally without any other problems/crashes/restarts?

Piotrek

> On 10 Aug 2018, at 06:59, Shailesh Jain  wrote:
> 
> Hi,
> 
> I hit a similar issue yesterday, the task manager died suspiciously, no error 
> logs in the task manager logs, but I see the following exceptions in the job 
> manager logs:
> 
> 2018-08-05 18:03:28,322 ERROR akka.remote.Remoting
>   - Association to [akka.tcp://flink@localhost:34483] with UID 
> [328996232] irrecoverably failed. Quarantining address.
> java.util.concurrent.TimeoutException: Remote system has been silent for too 
> long. (more than 48.0 hours)
> at 
> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at 
> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> but almost 3 days later it hit this:
> 
> 2018-08-08 13:22:00,061 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Internal 
> state machine job (1057c13d169dae609466210174e2cc8b) switched from state 
> RUNNING to FAILING.
> java.lang.Exception: TaskManager was lost/killed: 
> 5ee5de1112776c404541743b63ae0fe0 @ localhost (dataPort=44997)
> at 
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
> at 
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
> at 
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
> at 
> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
> at 
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
> at org.apache.flink.runtime.jobmanager.JobManager.org 
> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
> at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
> at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at 
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at 
> 

Re: Flink Rebalance

2018-08-10 Thread antonio saldivar
Hi Fabian

Thank you, yes there are just map functions, i will do it that way with
methods to get it faster

On Fri, Aug 10, 2018, 5:58 AM Fabian Hueske  wrote:

> Hi,
>
> Elias and Paul have good points.
> I think the performance degradation is mostly to the lack of function
> chaining in the rebalance case.
>
> If all steps are just map functions, they can be chained in the
> no-rebalance case.
> That means, records are passed via function calls.
> If you add rebalancing, records will be passed between map functions via
> serialization, network transfer, and deserialization.
> This is of course much more expensive than calling a method.
>
> Best, Fabian
>
> 2018-08-10 4:25 GMT+02:00 Paul Lam :
>
>> Hi Antonio,
>>
>> AFAIK, there are two reasons for this:
>>
>> 1. Rebalancing itself brings latency because it takes time to
>> redistribute the elements.
>> 2. Rebalancing also messes up the order in the Kafka topic partitions,
>> and often makes a event-time window wait longer to trigger in case you’re
>> using event time characteristic.
>>
>> Best Regards,
>> Paul Lam
>>
>>
>>
>> 在 2018年8月10日,05:49,antonio saldivar  写道:
>>
>> Hello
>>
>> Sending ~450 elements per second ( the values are in milliseconds start
>> to end)
>> I went from:
>> with Rebalance
>> *++*
>> *| **AVGWINDOW ** |*
>> *++*
>> *| *32131.0853  * |*
>> *++*
>>
>> to this without rebalance
>>
>> *++*
>> *| **AVGWINDOW ** |*
>> *++*
>> *| *70.2077   * |*
>> *++*
>>
>> El jue., 9 ago. 2018 a las 17:42, Elias Levy (<
>> fearsome.lucid...@gmail.com>) escribió:
>>
>>> What do you consider a lot of latency?  The rebalance will require
>>> serializing / deserializing the data as it gets distributed.  Depending on
>>> the complexity of your records and the efficiency of your serializers, that
>>> could have a significant impact on your performance.
>>>
>>> On Thu, Aug 9, 2018 at 2:14 PM antonio saldivar 
>>> wrote:
>>>
 Hello

 Does anyone know why when I add "rebalance()" to my .map steps is
 adding a lot of latency rather than not having rebalance.


 I have kafka partitions in my topic 44 and 44 flink task manager

 execution plan looks like this when I add rebalance but it is adding a
 lot of latency

 kafka-src -> rebalance -> step1 -> rebalance ->step2->rebalance ->
 kafka-sink

 Thank you
 regards


>>
>


Re: JDBCInputFormat and SplitDataProperties

2018-08-10 Thread Alexis Sarda
It seems I may have spoken too soon. After executing the job with more
data, I can see the following things in the Flink dashboard:

- The first subtask is a chained DataSource -> GroupCombine. Even with
parallelism set to 24 and a ParameterValuesProvider returning
Array(Array("first"), Array("second")), only 1 thread processed all records.
- The second subtask is a Sorted Group Reduce, and I see two weird things:
  + The first subtask sent 5,923,802 records, yet the second subtask only
received 5,575,154 records?
  + Again, everything was done in a single thread, even though a groupBy
was used.
- The third and final subtask is a sink that saves back to the database.

Does anyone know why parallelism is not being used?

Regards,
Alexis.


On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda  wrote:

> Hi Fabian,
>
> Thanks a lot for the help. The scala DataSet, at least in version 1.5.0,
> declares javaSet as private[flink], so I cannot access it directly.
> Nevertheless, I managed to get around it by using the java environment:
>
> val env = org.apache.flink.api.java.ExecutionEnvironment.
> getExecutionEnvironment
>
> val inputFormat = getInputFormat(query, dbUrl, properties)
> val outputFormat = getOutputFormat(dbUrl, properties)
>
> val source = env.createInput(inputFormat)
> val sdp = source.getSplitDataProperties
> sdp.splitsPartitionedBy(0)
> sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))
>
> // transform java DataSet to scala DataSet...
> new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
>   .groupBy(0, 1)
>   .combineGroup(groupCombiner)
>   .withForwardedFields("f0->_1")
>   .groupBy(0, 1)
>   .reduceGroup(groupReducer)
>   .withForwardedFields("_1")
>   .output(outputFormat)
>
> It seems to work well, and the semantic annotation does remove a hash
> partition from the execution plan.
>
> Regards,
> Alexis.
>
>
> On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske  wrote:
>
>> Hi Alexis,
>>
>> The Scala API does not expose a DataSource object but only a Scala
>> DataSet which wraps the Java object.
>> You can get the SplitDataProperties from the Scala DataSet as follows:
>>
>> val dbData: DataSet[...] = ???
>> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties
>>
>> So you first have to get the wrapped Java DataSet, cast it to DataSource
>> and then get the properties.
>> It's not very nice, but should work.
>>
>> In order to use SDPs, you should be a bit familiar how physical data
>> properties are propagated and discarded in the optimizer.
>> For example, applying a simple MapFunction removes all properties because
>> the function might have changed the fields on which a DataSet is
>> partitioned or sorted.
>> You can expose the behavior of a function to the optimizer by using
>> Semantic Annotations [1]
>>
>> Some comments on the code and plan you shared:
>> - You might want to add hostname to ORDER BY to have the output grouped
>> by (ts, hostname).
>> - Check the Global and Local data properties in the plan to validate that
>> the SDP were correctly interpreted.
>> - If the data is already correctly partitioned and sorted, you might not
>> need the Combiners. In either case, you properly want to annotate them with
>> Forward Field annoations.
>>
>> The number of source tasks is unrelated to the number of splits. If you
>> have more tasks than splits, some tasks won't process any data.
>>
>> Best, Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations
>>
>>
>> 2018-08-08 14:10 GMT+02:00 Alexis Sarda :
>>
>>> Hi Fabian,
>>>
>>> Thanks for the clarification. I have a few remarks, but let me provide
>>> more concrete information. You can find the query I'm using, the
>>> JDBCInputFormat creation, and the execution plan in this github gist:
>>>
>>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>>>
>>> I cannot call getSplitDataProperties because
>>> env.createInput(inputFormat) returns a DataSet, not a DataSource. In the
>>> code, I do this instead:
>>>
>>> val javaEnv =
>>> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment
>>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
>>> "example")
>>>
>>> which feels wrong (the constructor doesn't accept a Scala environment).
>>> Is there a better alternative?
>>>
>>> I see absolutely no difference in the execution plan whether I use SDP
>>> or not, so therefore the results are indeed the same. Is this expected?
>>>
>>> My ParameterValuesProvider specifies 2 splits, yet the execution plan
>>> shows Parallelism=24. Even the source code is a bit ambiguous, considering
>>> that the constructor for GenericInputSplit takes two parameters:
>>> partitionNumber and totalNumberOfPartitions. Should I assume that there are
>>> 2 splits divided into 24 partitions?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>
>>>
>>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske  wrote:
>>>
 Hi Alexis,

 First of all, I think 

Re: Flink Rebalance

2018-08-10 Thread Fabian Hueske
Hi,

Elias and Paul have good points.
I think the performance degradation is mostly to the lack of function
chaining in the rebalance case.

If all steps are just map functions, they can be chained in the
no-rebalance case.
That means, records are passed via function calls.
If you add rebalancing, records will be passed between map functions via
serialization, network transfer, and deserialization.
This is of course much more expensive than calling a method.

Best, Fabian

2018-08-10 4:25 GMT+02:00 Paul Lam :

> Hi Antonio,
>
> AFAIK, there are two reasons for this:
>
> 1. Rebalancing itself brings latency because it takes time to redistribute
> the elements.
> 2. Rebalancing also messes up the order in the Kafka topic partitions, and
> often makes a event-time window wait longer to trigger in case you’re using
> event time characteristic.
>
> Best Regards,
> Paul Lam
>
>
>
> 在 2018年8月10日,05:49,antonio saldivar  写道:
>
> Hello
>
> Sending ~450 elements per second ( the values are in milliseconds start to
> end)
> I went from:
> with Rebalance
> *++*
> *| **AVGWINDOW ** |*
> *++*
> *| *32131.0853  * |*
> *++*
>
> to this without rebalance
>
> *++*
> *| **AVGWINDOW ** |*
> *++*
> *| *70.2077   * |*
> *++*
>
> El jue., 9 ago. 2018 a las 17:42, Elias Levy ( >) escribió:
>
>> What do you consider a lot of latency?  The rebalance will require
>> serializing / deserializing the data as it gets distributed.  Depending on
>> the complexity of your records and the efficiency of your serializers, that
>> could have a significant impact on your performance.
>>
>> On Thu, Aug 9, 2018 at 2:14 PM antonio saldivar 
>> wrote:
>>
>>> Hello
>>>
>>> Does anyone know why when I add "rebalance()" to my .map steps is adding
>>> a lot of latency rather than not having rebalance.
>>>
>>>
>>> I have kafka partitions in my topic 44 and 44 flink task manager
>>>
>>> execution plan looks like this when I add rebalance but it is adding a
>>> lot of latency
>>>
>>> kafka-src -> rebalance -> step1 -> rebalance ->step2->rebalance ->
>>> kafka-sink
>>>
>>> Thank you
>>> regards
>>>
>>>
>


Re: Dataset.distinct - Question on deterministic results

2018-08-10 Thread Fabian Hueske
Hi Will,

The distinct operator is implemented as a groupBy(distinctKeys) and a
ReduceFunction that returns the first argument.
Hence, it depends on the order in which the records are processed by the
ReduceFunction.

Flink does not maintain a deterministic order because it is quite expensive
in distributed systems.
There are a few aspects that result in random order:
- lazy split assignment
- combiners (which are automatically added for ReduceFunctions)
- network shuffles

There are two ways to address this issue:
1) Fully sort the input of the combiners and reducers on all attributes.
2) Use a custom ReduceFunction that compares both input records on all
(non-distinct-key) fields to determine which record to return.

I would go for the second approach because it is more efficient (no need to
fully sort before the combiner).

Best, Fabian

2018-08-09 18:12 GMT+02:00 Will Bastian :

> I'm operating on a data set with some challenges to overcome. They are:
>
>1. There is possibility for multiple entries for a single key
>and
>2. For a single key, there may be multiple unique value-tuples
>
> For example
> key, val1, val2, val3
> 1,  0,0,0
> 1,  0,0,0
> 1,  1,0,0
> 2,  1,1,1
> 2,  1,1,1
> 2,  1,1,0
> 1,  0,0,0
>
> I've found when executing mySet.distinct(_.key) on the above, that my
> final results suggest distinct isn't always pulling the same
> record/value-tuple on every run.
>
> Fully understanding that the use of distinct I've outlined above isn't
> optimal (we don't know, or care which value-tuple we get, we just want it
> to be consistent on each run), I wanted to validate whether what I believe
> I'm observing is accurate. Specifically, in this example is Flink reducing
> by key with no concern for value, and we can expect the possibility that we
> may pull different instances back on each distinct call?
>
> Thanks,
> Will
>


Re: UTF-16 support for TextInputFormat

2018-08-10 Thread Fabian Hueske
Hi David,

Thanks for digging into the code! I had a quick look into the classes as
well.
As far as I can see, your analysis is correct and the BOM handling in
DelimitedInputFormat and TextInputFormat (and other text-based IFs such as
CsvInputFormat) is broken.
In fact, its obvious that nobody paid attention to this yet.

It would be great if you could open a Jira issue and copy your analysis and
solution proposal into it.
While on it, we could also deprecated the (duplicated) setCharsetName()
method from TextInputFormat and redirect it to
DelimitedInputFormat.setCharset().

Would you also be interested in contributing a fix for this problem?

Best, Fabian

[1]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95

2018-08-09 14:55 GMT+02:00 David Dreyfus :

> Hi Fabian,
>
> Thank you for taking my email.
> TextInputFormat.setCharsetName("UTF-16") appears to set the private
> variable TextInputFormat.charsetName.
> It doesn't appear to cause additional behavior that would help interpret
> UTF-16 data.
>
> The method I've tested is calling DelimitedInputFormat.setCharset("UTF-16"),
> which then sets TextInputFormat.charsetName and then modifies the
> previously set delimiterString to construct the proper byte string encoding
> of the the delimiter. This same charsetName is also used in
> TextInputFormat.readRecord() to interpret the bytes read from the file.
>
> There are two problems that this implementation would seem to have when
> using UTF-16.
>
>1. delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java
>will return a Big Endian byte sequence including the Byte Order Mark (BOM).
>The actual text file will not contain a BOM at each line ending, so the
>delimiter will never be read. Moreover, if the actual byte encoding of the
>file is Little Endian, the bytes will be interpreted incorrectly.
>2. TextInputFormat.readRecord() will not see a BOM each time it
>decodes a byte sequence with the String(bytes, offset, numBytes, charset)
>call. Therefore, it will assume Big Endian, which may not always be 
> correct.
>
> While there are likely many solutions, I would think that all of them
> would have to start by reading the BOM from the file when a Split is opened
> and then using that BOM to modify the specified encoding to a BOM specific
> one when the caller doesn't specify one, and to overwrite the caller's
> specification if the BOM is in conflict with the caller's specification.
> That is, if the BOM indicates Little Endian and the caller indicates
> UTF-16BE, Flink should rewrite the charsetName as UTF-16LE.
>
> I hope this makes sense and that I haven't been testing incorrectly or
> misreading the code.
>
> Thank you,
> David
>
> On Thu, Aug 9, 2018 at 4:04 AM Fabian Hueske  wrote:
>
>> Hi David,
>>
>> Did you try to set the encoding on the TextInputFormat with
>>
>> TextInputFormat tif = ...
>> tif.setCharsetName("UTF-16");
>>
>> Best, Fabian
>>
>> 2018-08-08 17:45 GMT+02:00 David Dreyfus :
>>
>>> Hello -
>>>
>>> It does not appear that Flink supports a charset encoding of "UTF-16".
>>> It particular, it doesn't appear that Flink consumes the Byte Order Mark
>>> (BOM) to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. Are there
>>> any plans to enhance Flink to handle UTF-16 with BOM?
>>>
>>> Thank you,
>>> David
>>>
>>
>>


Re: flink telemetry/metrics

2018-08-10 Thread Chesnay Schepler

What is wrong with the metrics that are shown in graphite?

Can you provide us with the metrics section of your flink-conf.yaml?

Are there any metric-related warnings in the TaskManager logs?

On 09.08.2018 01:38, John O wrote:


I’m working on getting a flink job into production. As part of the 
production requirement, I need telemetry/metrics insight into my flink 
job. I have followed instructions in

https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html

- Added the flink graphite jar to taskmanager/jobmanager lib folder
- confligured flink-conf.yaml to enable graphite reporter

- Added a simple counter in my flink code

When I submit the job, I can see my counter show up in the flink web 
ui’s Task Metrics section. But the counter does not show up in 
Graphite.  Also, the metrics that actually make it to Graphite doesn’t 
seems like it’s published properly.


Is anyone actually using Graphite Reporter? What is your experience? 
What am I missing?






Re: Small-files source - partitioning based on prefix of file

2018-08-10 Thread Fabian Hueske
Hi Averell,

One comment regarding what you said:

> As my files are small, I think there would not be much benefit in
checkpointing file offset state.

Checkpointing is not about efficiency but about consistency.
If the position in a split is not checkpointed, your application won't
operate with exactly-once state consistency unless each split produces
exactly one record.

Best, Fabian

2018-08-10 9:10 GMT+02:00 Jörn Franke :

> Or you write a custom file system for Flink... (for  the tar part).
> Unfortunately gz files can only be processed single threaded (there are
> some multiple thread implementation but they don’t bring the big gain).
>
> On 10. Aug 2018, at 07:07, vino yang  wrote:
>
> Hi Averell,
>
> In this case, I think you may need to extend Flink's existing source.
> First, read your tar.gz large file, when it been decompressed, use the
> multi-threaded ability to read the record in the source, and then parse the
> data format (map / flatmap  might be a suitable operator, you can chain
> them with source because these two operator don't require data shuffle).
>
> Note that Flink doesn't encourage creating extra threads in UDFs, but I
> don't know if there is a better way for this scenario.
>
> Thanks, vino.
>
> Averell  于2018年8月10日周五 下午12:05写道:
>
>> Hi Fabian, Vino,
>>
>> I have one more question, which I initially planned to create a new
>> thread,
>> but now I think it is better to ask here:
>> I need to process one big tar.gz file which contains multiple small gz
>> files. What is the best way to do this? I am thinking of having one single
>> thread process that read the TarArchiveStream (which has been decompressed
>> from that tar.gz by Flink automatically), and then distribute the
>> TarArchiveEntry entries to a multi-thread operator which would process the
>> small files in parallel. If this is feasible, which elements from Flink I
>> can reuse?
>>
>> Thanks a lot.
>> Regards,
>> Averell
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>
>


Re: SQL parallelism setting

2018-08-10 Thread Timo Walther

Hi,

currenlty, you can only set the parallelism for an entire Flink job 
using env.setParallelism().


There are rough ideas of how we could improve the situation in the 
future to control the parallelism of individual operators but this might 
need one or two releases.


Regards,
Timo

Am 10.08.18 um 08:54 schrieb Shu Li Zheng:

Hi community,

Is there a way to change parallelism on sqlQuery()?

Regards,

Shu li Zheng





Re: Small-files source - partitioning based on prefix of file

2018-08-10 Thread Jörn Franke
Or you write a custom file system for Flink... (for  the tar part).
Unfortunately gz files can only be processed single threaded (there are some 
multiple thread implementation but they don’t bring the big gain). 

> On 10. Aug 2018, at 07:07, vino yang  wrote:
> 
> Hi Averell,
> 
> In this case, I think you may need to extend Flink's existing source. 
> First, read your tar.gz large file, when it been decompressed, use the 
> multi-threaded ability to read the record in the source, and then parse the 
> data format (map / flatmap  might be a suitable operator, you can chain them 
> with source because these two operator don't require data shuffle).
> 
> Note that Flink doesn't encourage creating extra threads in UDFs, but I don't 
> know if there is a better way for this scenario.
> 
> Thanks, vino.
> 
> Averell  于2018年8月10日周五 下午12:05写道:
>> Hi Fabian, Vino,
>> 
>> I have one more question, which I initially planned to create a new thread,
>> but now I think it is better to ask here:
>> I need to process one big tar.gz file which contains multiple small gz
>> files. What is the best way to do this? I am thinking of having one single
>> thread process that read the TarArchiveStream (which has been decompressed
>> from that tar.gz by Flink automatically), and then distribute the
>> TarArchiveEntry entries to a multi-thread operator which would process the
>> small files in parallel. If this is feasible, which elements from Flink I
>> can reuse?
>> 
>> Thanks a lot.
>> Regards,
>> Averell
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


SQL parallelism setting

2018-08-10 Thread Shu Li Zheng
Hi community,

Is there a way to change parallelism on sqlQuery()?

Regards,

Shu li Zheng