Re: Small-files source - partitioning based on prefix of file

2018-08-09 Thread vino yang
Hi Averell,

In this case, I think you may need to extend Flink's existing source.
First, read your tar.gz large file, when it been decompressed, use the
multi-threaded ability to read the record in the source, and then parse the
data format (map / flatmap  might be a suitable operator, you can chain
them with source because these two operator don't require data shuffle).

Note that Flink doesn't encourage creating extra threads in UDFs, but I
don't know if there is a better way for this scenario.

Thanks, vino.

Averell  于2018年8月10日周五 下午12:05写道:

> Hi Fabian, Vino,
>
> I have one more question, which I initially planned to create a new thread,
> but now I think it is better to ask here:
> I need to process one big tar.gz file which contains multiple small gz
> files. What is the best way to do this? I am thinking of having one single
> thread process that read the TarArchiveStream (which has been decompressed
> from that tar.gz by Flink automatically), and then distribute the
> TarArchiveEntry entries to a multi-thread operator which would process the
> small files in parallel. If this is feasible, which elements from Flink I
> can reuse?
>
> Thanks a lot.
> Regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Standalone cluster instability

2018-08-09 Thread Shailesh Jain
Hi,

I hit a similar issue yesterday, the task manager died suspiciously, no
error logs in the task manager logs, but I see the following exceptions in
the job manager logs:

2018-08-05 18:03:28,322 ERROR
akka.remote.Remoting  - Association
to [akka.tcp://flink@localhost:34483] with UID [328996232] irrecoverably
failed. Quarantining address.
java.util.concurrent.TimeoutException: Remote system has been silent for
too long. (more than 48.0 hours)
at
akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at
akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

but almost 3 days later it hit this:

2018-08-08 13:22:00,061 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from
state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed:
5ee5de1112776c404541743b63ae0fe0 @ localhost (dataPort=44997)
at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
at
org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
at akka.actor.ActorCell.invoke(ActorCell.scala:494)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

followed by:

2018-08-08 13:22:20,090 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from
state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager in the
configuration. Task to schedule: < Attempt #2 (Source: Custom Source ->
Filter (1/1)) @ (unassigned) - [SCHEDULED] > with groupID <
fbd084243e87c3fdf3c709a0f2eecfd7 > in sharing group < SlotSharingGroup
[fa00013ef15454ea93d21e8c346e0dd4, fbd084243e87c3fdf3c709a0f2eecfd7,
8f5517c035f67da702f459ef5f3b849f] >. Resources available to scheduler:
Number of 

Re: Small-files source - partitioning based on prefix of file

2018-08-09 Thread Averell
Hi Fabian, Vino,

I have one more question, which I initially planned to create a new thread,
but now I think it is better to ask here:
I need to process one big tar.gz file which contains multiple small gz
files. What is the best way to do this? I am thinking of having one single
thread process that read the TarArchiveStream (which has been decompressed
from that tar.gz by Flink automatically), and then distribute the
TarArchiveEntry entries to a multi-thread operator which would process the
small files in parallel. If this is feasible, which elements from Flink I
can reuse?

Thanks a lot.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Small-files source - partitioning based on prefix of file

2018-08-09 Thread Averell
Thank you Vino and Fabien for your help in answering my questions. As my
files are small, I think there would not be much benefit in checkpointing
file offset state.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Rebalance

2018-08-09 Thread Paul Lam
Hi Antonio, 

AFAIK, there are two reasons for this: 

1. Rebalancing itself brings latency because it takes time to redistribute the 
elements. 
2. Rebalancing also messes up the order in the Kafka topic partitions, and 
often makes a event-time window wait longer to trigger in case you’re using 
event time characteristic. 

Best Regards,
Paul Lam


> 在 2018年8月10日,05:49,antonio saldivar  写道:
> 
> Hello
> 
> Sending ~450 elements per second ( the values are in milliseconds start to 
> end)
> I went from:
> with Rebalance
> ++
> | AVGWINDOW  |
> ++
> | 32131.0853   |
> ++
> 
> to this without rebalance
> 
> ++
> | AVGWINDOW  |
> ++
> | 70.2077|
> ++
> 
> El jue., 9 ago. 2018 a las 17:42, Elias Levy ( >) escribió:
> What do you consider a lot of latency?  The rebalance will require 
> serializing / deserializing the data as it gets distributed.  Depending on 
> the complexity of your records and the efficiency of your serializers, that 
> could have a significant impact on your performance.
> 
> On Thu, Aug 9, 2018 at 2:14 PM antonio saldivar  > wrote:
> Hello
> 
> Does anyone know why when I add "rebalance()" to my .map steps is adding a 
> lot of latency rather than not having rebalance.
> 
> 
> I have kafka partitions in my topic 44 and 44 flink task manager
> 
> execution plan looks like this when I add rebalance but it is adding a lot of 
> latency
> 
> kafka-src -> rebalance -> step1 -> rebalance ->step2->rebalance -> kafka-sink
> 
> Thank you 
> regards
> 



Re: Flink Rebalance

2018-08-09 Thread antonio saldivar
Hello

Sending ~450 elements per second ( the values are in milliseconds start to
end)
I went from:
with Rebalance

*++*

*| **AVGWINDOW ** |*

*++*

*| *32131.0853  * |*

*++*

to this without rebalance

*++*

*| **AVGWINDOW ** |*

*++*

*| *70.2077   * |*

*++*

El jue., 9 ago. 2018 a las 17:42, Elias Levy ()
escribió:

> What do you consider a lot of latency?  The rebalance will require
> serializing / deserializing the data as it gets distributed.  Depending on
> the complexity of your records and the efficiency of your serializers, that
> could have a significant impact on your performance.
>
> On Thu, Aug 9, 2018 at 2:14 PM antonio saldivar 
> wrote:
>
>> Hello
>>
>> Does anyone know why when I add "rebalance()" to my .map steps is adding
>> a lot of latency rather than not having rebalance.
>>
>>
>> I have kafka partitions in my topic 44 and 44 flink task manager
>>
>> execution plan looks like this when I add rebalance but it is adding a
>> lot of latency
>>
>> kafka-src -> rebalance -> step1 -> rebalance ->step2->rebalance ->
>> kafka-sink
>>
>> Thank you
>> regards
>>
>>


Re: Flink Rebalance

2018-08-09 Thread Elias Levy
What do you consider a lot of latency?  The rebalance will require
serializing / deserializing the data as it gets distributed.  Depending on
the complexity of your records and the efficiency of your serializers, that
could have a significant impact on your performance.

On Thu, Aug 9, 2018 at 2:14 PM antonio saldivar  wrote:

> Hello
>
> Does anyone know why when I add "rebalance()" to my .map steps is adding a
> lot of latency rather than not having rebalance.
>
>
> I have kafka partitions in my topic 44 and 44 flink task manager
>
> execution plan looks like this when I add rebalance but it is adding a lot
> of latency
>
> kafka-src -> rebalance -> step1 -> rebalance ->step2->rebalance ->
> kafka-sink
>
> Thank you
> regards
>
>


Flink Rebalance

2018-08-09 Thread antonio saldivar
Hello

Does anyone know why when I add "rebalance()" to my .map steps is adding a
lot of latency rather than not having rebalance.


I have kafka partitions in my topic 44 and 44 flink task manager

execution plan looks like this when I add rebalance but it is adding a lot
of latency

kafka-src -> rebalance -> step1 -> rebalance ->step2->rebalance ->
kafka-sink

Thank you
regards


Re: Getting compilation error in Array[TypeInformation]

2018-08-09 Thread Mich Talebzadeh
Thanks those suggestions helped

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 9 Aug 2018 at 16:41, Timo Walther  wrote:

> Hi Mich,
>
> I strongly recommend to read a good Scala programming tutorial before
> writing on a mailing list.
>
> As the error indicates you are missing generic parameters. If you don't
> know the parameter use `Array[TypeInformation[_]]` or `TableSink[_]`. For
> the types class you need to import the types class
> "org.apache.flink.table.api.Types".
>
> Regards,
> Timo
>
>
> Am 09.08.18 um 17:18 schrieb Mich Talebzadeh:
>
> This is the code in Scala
>
> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
> tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
> 'timeissued, 'price)
> val result = tableEnv.scan("priceTable").filter('ticker === "VOD" &&
> 'price > 99.0).select('key, 'ticker, 'timeissued, 'price)
>
> val fieldNames: Array[String] = Array("key", "ticker", "timeissued",
> "price")
> val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
> Types.STRING, Types.STRING, Types.Float)
> val sink: TableSink = new CsvTableSink(writeDirectory+fileName,
> fieldDelim = ",")
> tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes,
> sink)
> result.insertInto("CsvSinkTable")
>
> When compiling I get the following error
>
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
> class TypeInformation takes type parameters
> [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
> Types.STRING, Types.STRING, Types.Float)
> [error]   ^
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
> not found: value Types
> [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
> Types.STRING, Types.STRING, Types.Float)
> [error]^
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
> not found: value Types
> [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
> Types.STRING, Types.STRING, Types.Float)
> [error]  ^
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
> not found: value Types
> [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
> Types.STRING, Types.STRING, Types.Float)
> [error]
> ^
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
> not found: value Types
> [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
> Types.STRING, Types.STRING, Types.Float)
> [error]
> ^
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:172:
> trait TableSink takes type parameters
> [error] val sink: TableSink = new
> CsvTableSink(writeDirectory+fileName, fieldDelim = ",")
> [error]   ^
> [error] 6 errors found
>
> May be I am not importing the correct dependencies.
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>


Dataset.distinct - Question on deterministic results

2018-08-09 Thread Will Bastian
I'm operating on a data set with some challenges to overcome. They are:

   1. There is possibility for multiple entries for a single key
   and
   2. For a single key, there may be multiple unique value-tuples

For example
key, val1, val2, val3
1,  0,0,0
1,  0,0,0
1,  1,0,0
2,  1,1,1
2,  1,1,1
2,  1,1,0
1,  0,0,0

I've found when executing mySet.distinct(_.key) on the above, that my final
results suggest distinct isn't always pulling the same record/value-tuple
on every run.

Fully understanding that the use of distinct I've outlined above isn't
optimal (we don't know, or care which value-tuple we get, we just want it
to be consistent on each run), I wanted to validate whether what I believe
I'm observing is accurate. Specifically, in this example is Flink reducing
by key with no concern for value, and we can expect the possibility that we
may pull different instances back on each distinct call?

Thanks,
Will


Re: Getting compilation error in Array[TypeInformation]

2018-08-09 Thread Timo Walther

Hi Mich,

I strongly recommend to read a good Scala programming tutorial before 
writing on a mailing list.


As the error indicates you are missing generic parameters. If you don't 
know the parameter use `Array[TypeInformation[_]]` or `TableSink[_]`. 
For the types class you need to import the types class 
"org.apache.flink.table.api.Types".


Regards,
Timo


Am 09.08.18 um 17:18 schrieb Mich Talebzadeh:

This is the code in Scala

   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 
'timeissued, 'price)
    val result = tableEnv.scan("priceTable").filter('ticker === "VOD" 
&& 'price > 99.0).select('key, 'ticker, 'timeissued, 'price)


    val fieldNames: Array[String] = Array("key", "ticker", 
"timeissued", "price")
    val fieldTypes: Array[TypeInformation] = Array(Types.STRING, 
Types.STRING, Types.STRING, Types.Float)
    val sink: TableSink = new CsvTableSink(writeDirectory+fileName, 
fieldDelim = ",")
    tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, 
sink)

    result.insertInto("CsvSinkTable")

When compiling I get the following error

[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: 
class TypeInformation takes type parameters
[error] val fieldTypes: Array[TypeInformation] = 
Array(Types.STRING, Types.STRING, Types.STRING, Types.Float)

[error]   ^
[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: 
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = 
Array(Types.STRING, Types.STRING, Types.STRING, Types.Float)

[error]    ^
[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: 
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = 
Array(Types.STRING, Types.STRING, Types.STRING, Types.Float)

[error] ^
[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: 
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = 
Array(Types.STRING, Types.STRING, Types.STRING, Types.Float)

[error] ^
[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: 
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = 
Array(Types.STRING, Types.STRING, Types.STRING, Types.Float)

[error] ^
[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:172: 
trait TableSink takes type parameters
[error] val sink: TableSink = new 
CsvTableSink(writeDirectory+fileName, fieldDelim = ",")

[error]   ^
[error] 6 errors found

May be I am not importing the correct dependencies.

Thanks

Dr Mich Talebzadeh

LinkedIn 
/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/


http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk.Any and all responsibility for 
any loss, damage or destruction of data or any other property which 
may arise from relying on this email's technical content is explicitly 
disclaimed. The author will in no case be liable for any monetary 
damages arising from such loss, damage or destruction.






Getting compilation error in Array[TypeInformation]

2018-08-09 Thread Mich Talebzadeh
This is the code in Scala

val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
'timeissued, 'price)
val result = tableEnv.scan("priceTable").filter('ticker === "VOD" &&
'price > 99.0).select('key, 'ticker, 'timeissued, 'price)

val fieldNames: Array[String] = Array("key", "ticker", "timeissued",
"price")
val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
Types.STRING, Types.STRING, Types.Float)
val sink: TableSink = new CsvTableSink(writeDirectory+fileName,
fieldDelim = ",")
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
result.insertInto("CsvSinkTable")

When compiling I get the following error

[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
class TypeInformation takes type parameters
[error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
Types.STRING, Types.STRING, Types.Float)
[error]   ^
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
Types.STRING, Types.STRING, Types.Float)
[error]^
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
Types.STRING, Types.STRING, Types.Float)
[error]  ^
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
Types.STRING, Types.STRING, Types.Float)
[error]
^
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
Types.STRING, Types.STRING, Types.Float)
[error]
^
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:172:
trait TableSink takes type parameters
[error] val sink: TableSink = new CsvTableSink(writeDirectory+fileName,
fieldDelim = ",")
[error]   ^
[error] 6 errors found

May be I am not importing the correct dependencies.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Table API, custom window

2018-08-09 Thread Fabian Hueske
Hi,

regarding the plans. There are no plans to support custom window assigners
and evictors.
There were some thoughts about supporting different result update
strategies that could be used to return early results or update results in
case of late data.

However, these features are currently not on the near or mid-term roadmap.
Most contributors are focusing on adding and improving connectors,
improving the SQL client, or adding built-in functions.
There is also work for better join support.

Best, Fabian

2018-08-09 15:00 GMT+02:00 Timo Walther :

> Hi Oleksandr,
>
> currenlty, we don't support custom windows for Table API. The Table & SQL
> API try to solve the most common cases but for more specific logic we
> recommend the DataStream API.
>
> Regards,
> Timo
>
> Am 09.08.18 um 14:15 schrieb Oleksandr Nitavskyi:
>
> Hello guys,
>
>
>
> I am curious, is there a way to define custom window
> (assigners/trigger/evictor) for Table/Sql Flink API? Looks like
> documentation keep silence about this, but is there are plans for it? Or
> should we go with DataStream API in case we need such kind of functionality?
>
>
>
> Thanks
>
> Oleksandr Nitavskyi
>
>
>


Re: Table API, custom window

2018-08-09 Thread Timo Walther

Hi Oleksandr,

currenlty, we don't support custom windows for Table API. The Table & 
SQL API try to solve the most common cases but for more specific logic 
we recommend the DataStream API.


Regards,
Timo

Am 09.08.18 um 14:15 schrieb Oleksandr Nitavskyi:


Hello guys,

I am curious, is there a way to define custom window 
(assigners/trigger/evictor) for Table/Sql Flink API? Looks like 
documentation keep silence about this, but is there are plans for it? 
Or should we go with DataStream API in case we need such kind of 
functionality?


Thanks

Oleksandr Nitavskyi





Re: UTF-16 support for TextInputFormat

2018-08-09 Thread David Dreyfus
Hi Fabian,

Thank you for taking my email.
TextInputFormat.setCharsetName("UTF-16") appears to set the private
variable TextInputFormat.charsetName.
It doesn't appear to cause additional behavior that would help interpret
UTF-16 data.

The method I've tested is calling
DelimitedInputFormat.setCharset("UTF-16"), which then sets
TextInputFormat.charsetName and then modifies the previously set
delimiterString to construct the proper byte string encoding of the the
delimiter. This same charsetName is also used in
TextInputFormat.readRecord() to interpret the bytes read from the file.

There are two problems that this implementation would seem to have when
using UTF-16.

   1. delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java
   will return a Big Endian byte sequence including the Byte Order Mark (BOM).
   The actual text file will not contain a BOM at each line ending, so the
   delimiter will never be read. Moreover, if the actual byte encoding of the
   file is Little Endian, the bytes will be interpreted incorrectly.
   2. TextInputFormat.readRecord() will not see a BOM each time it decodes
   a byte sequence with the String(bytes, offset, numBytes, charset) call.
   Therefore, it will assume Big Endian, which may not always be correct.

While there are likely many solutions, I would think that all of them would
have to start by reading the BOM from the file when a Split is opened and
then using that BOM to modify the specified encoding to a BOM specific one
when the caller doesn't specify one, and to overwrite the caller's
specification if the BOM is in conflict with the caller's specification.
That is, if the BOM indicates Little Endian and the caller indicates
UTF-16BE, Flink should rewrite the charsetName as UTF-16LE.

I hope this makes sense and that I haven't been testing incorrectly or
misreading the code.

Thank you,
David

On Thu, Aug 9, 2018 at 4:04 AM Fabian Hueske  wrote:

> Hi David,
>
> Did you try to set the encoding on the TextInputFormat with
>
> TextInputFormat tif = ...
> tif.setCharsetName("UTF-16");
>
> Best, Fabian
>
> 2018-08-08 17:45 GMT+02:00 David Dreyfus :
>
>> Hello -
>>
>> It does not appear that Flink supports a charset encoding of "UTF-16". It
>> particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM)
>> to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. Are there any
>> plans to enhance Flink to handle UTF-16 with BOM?
>>
>> Thank you,
>> David
>>
>
>


2 node cluster reading file from ftp

2018-08-09 Thread Mohan mohan
Hi,

*Context* :
# Started cluster with 2 nodes
 node1 (Master & slave)
 node2 (Slave)

# Source path to CsvInputFormat is like
"*ftp*://test:test@x.x.x.x:5678/source.csv"
and DataSet parallelism is 2

# Path to FileOutputFormat is like "*ftp*://test:test@x.x.x.x:5678/target.csv"
and DataSink parallelism is 1

# ExecutionEnvironment.createRemoteEnvironment("x.x.x.x", 6123,
"xxx-DEVELOP-SNAPSHOT-fat.jar");

# Flink version 1.4.0

While executing above environment transformation is not successful, below
is stacktrace,


org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
at
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419)
at
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208)
at
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185)
at
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
at
com.adaequare.etl2.batch.transform.ETLTransformer.transform(ETLTransformer.java:128)
at etl2.batch.EmployeeSamples.basic(EmployeeSamples.java:91)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.io.network.partition.
*PartitionNotFoundException*: Partition
dcf1cf429b3b4fe14b98b34bdcde3644@f17a5b9a7a9afd093213ca0d05f9ebdc not found.
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:270)
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:172)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:392)
at
org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1300)
at

Table API, custom window

2018-08-09 Thread Oleksandr Nitavskyi
Hello guys,

I am curious, is there a way to define custom window 
(assigners/trigger/evictor) for Table/Sql Flink API? Looks like documentation 
keep silence about this, but is there are plans for it? Or should we go with 
DataStream API in case we need such kind of functionality?

Thanks
Oleksandr Nitavskyi


Re: Heap Problem with Checkpoints

2018-08-09 Thread Piotr Nowojski
Hi,

Thanks for getting back with more information.

Apparently this is a known bug of JDK since 2003 and is still not resolved:
https://bugs.java.com/view_bug.do?bug_id=4872014 

https://bugs.java.com/view_bug.do?bug_id=6664633 


Code that is using this `deleteOnExit` is not part of a Flink, but an external 
library that we are using (hadoop-aws:2.8.x), so we can not fix it for them and 
this bug should be reported/forwarded to them (I have already done just that 
). More interesting 
S3AOutputStream is already manually deleting those files when they are not 
needed in `org.apache.hadoop.fs.s3a.S3AOutputStream#close`’s finally block:

} finally {
  if (!backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", backupFile);
  }
  super.close();
}

But this doesn’t remove the entry from DeleteOnExitHook. 

From what I see in the code, flink-s3-fs-presto filesystem implantation that we 
provide doesn’t use deleteOnExit, so if you can switch to this filesystem it 
would solve the problem for you.

Piotrek

> On 9 Aug 2018, at 12:09, Ayush Verma  wrote:
> 
> Hello Piotr, I work with Fabian and have been investigating the memory leak
> associated with issues mentioned in this thread. I took a heap dump of our
> master node and noticed that there was >1gb (and growing) worth of entries
> in the set, /files/, in class *java.io.DeleteOnExitHook*.
> Almost all the strings in this set look like,
> /tmp/hadoop-root/s3a/output-*.tmp.
> 
> This means that the checkpointing code, which uploads the data to s3,
> maintains it in a temporary local file, which is supposed to be deleted on
> exit of the JVM. In our case, the checkpointing is quite heavy and because
> we have a long running flink cluster, it causes this /set/ to grow
> unbounded, eventually cause an OOM. Please see these images:
> 
>  
> 
>  
> 
> The culprit seems to be *org.apache.hadoop.fs.s3a.S3AOutputStream*, which
> in-turn, calls
> *org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite()*. If we
> follow the method call chain from there, we end up at
> *org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite()*, where we
> can see the temp file being created and the method deleteOnExit() being
> called.
> 
> Maybe instead of relying on *deleteOnExit()* we can keep track of these tmp
> files, and as soon as they are no longer required, delete them ourself.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Window state with rocksdb backend

2018-08-09 Thread Stefan Richter
Hi,

it is not quiet clear to me what your window function is doing, so sharing some 
(pseudo) code would be helpful. Is it ever calling a update-function for the 
state you are trying to modify? From the information I have it seems not the be 
the case and that is a wrong use of the API which required you to call the 
update method. This wrong use somewhat seems to work for heap-based backends, 
because you are manipulating the objects directly (for efficiency reasons, 
otherwise we always had to make deep defensive copies), but this will not work 
for RocksDB because you always just work on a de-serialized copy of the 
ground-truth, and that is why updates are explicit.

Best,
Stefan

> Am 09.08.2018 um 10:36 schrieb 祁明良 :
> 
> Hi all,
> 
> This is mingliang, I got a problem with rocksdb backend.
> 
> I'm currently using a 15min SessionWindow which also fires every 10s, there's 
> no pre-aggregation, so the input of WindowFunction would be the whole 
> Iterator of input object.
> For window operator, I assume this collection is also a state that maintained 
> by Flink.
> Then, in each 10s fire, the window function will take the objects out from 
> iterator and do some update, and in next fire, I assume I would get the 
> updated value of that object.
> With File system backend it was successful but eats a lot of memory and 
> finally I got GC overhead limit, then I switch to rocksdb backend and the 
> problem is the object in the next fire round is not updated by the previous 
> fire round.
> Do I have to do some additional staff with rocksdb backend in this case?
> 
> Thanks in advance
> Mingliang
> 
> 
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
>  
> This communication may contain privileged or other confidential information 
> of Red. If you have received it in error, please advise the sender by reply 
> e-mail and immediately delete the message and any attachments without copying 
> or disclosing the contents. Thank you.



Re: Window state with rocksdb backend

2018-08-09 Thread Aljoscha Krettek
Hi,

the objects you get in the WindowFunction are not supposed to be mutated. Any 
changes to them are not guaranteed to be synced back to the state backend.

Why are you modifying in the objects? Maybe there's another way of achieving 
what you want to do.

Best,
Aljoscha

> On 9. Aug 2018, at 10:36, 祁明良  wrote:
> 
> Hi all,
> 
> This is mingliang, I got a problem with rocksdb backend.
> 
> I'm currently using a 15min SessionWindow which also fires every 10s, there's 
> no pre-aggregation, so the input of WindowFunction would be the whole 
> Iterator of input object.
> For window operator, I assume this collection is also a state that maintained 
> by Flink.
> Then, in each 10s fire, the window function will take the objects out from 
> iterator and do some update, and in next fire, I assume I would get the 
> updated value of that object.
> With File system backend it was successful but eats a lot of memory and 
> finally I got GC overhead limit, then I switch to rocksdb backend and the 
> problem is the object in the next fire round is not updated by the previous 
> fire round.
> Do I have to do some additional staff with rocksdb backend in this case?
> 
> Thanks in advance
> Mingliang
> 
> 
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
>  
> This communication may contain privileged or other confidential information 
> of Red. If you have received it in error, please advise the sender by reply 
> e-mail and immediately delete the message and any attachments without copying 
> or disclosing the contents. Thank you.



Re: [ANNOUNCE] Apache Flink 1.6.0 released

2018-08-09 Thread vino yang
Congratulations!

Great work! Till, thank you for advancing the smooth release of Flink 1.6.

Vino.

Till Rohrmann  于2018年8月9日周四 下午7:21写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.6.0.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2018/08/09/release-1.6.0.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12342760
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Till
>


[ANNOUNCE] Apache Flink 1.6.0 released

2018-08-09 Thread Till Rohrmann
The Apache Flink community is very happy to announce the release of Apache
Flink 1.6.0.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2018/08/09/release-1.6.0.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12342760

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Till


Re: Heap Problem with Checkpoints

2018-08-09 Thread Ayush Verma
Hello Piotr, I work with Fabian and have been investigating the memory leak
associated with issues mentioned in this thread. I took a heap dump of our
master node and noticed that there was >1gb (and growing) worth of entries
in the set, /files/, in class *java.io.DeleteOnExitHook*.
Almost all the strings in this set look like,
/tmp/hadoop-root/s3a/output-*.tmp.

This means that the checkpointing code, which uploads the data to s3,
maintains it in a temporary local file, which is supposed to be deleted on
exit of the JVM. In our case, the checkpointing is quite heavy and because
we have a long running flink cluster, it causes this /set/ to grow
unbounded, eventually cause an OOM. Please see these images:

 

 

The culprit seems to be *org.apache.hadoop.fs.s3a.S3AOutputStream*, which
in-turn, calls
*org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite()*. If we
follow the method call chain from there, we end up at
*org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite()*, where we
can see the temp file being created and the method deleteOnExit() being
called.

Maybe instead of relying on *deleteOnExit()* we can keep track of these tmp
files, and as soon as they are no longer required, delete them ourself.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: JDBCInputFormat and SplitDataProperties

2018-08-09 Thread Alexis Sarda
Hi Fabian,

Thanks a lot for the help. The scala DataSet, at least in version 1.5.0,
declares javaSet as private[flink], so I cannot access it directly.
Nevertheless, I managed to get around it by using the java environment:

val env = org.apache.flink.api.java.ExecutionEnvironment.
getExecutionEnvironment

val inputFormat = getInputFormat(query, dbUrl, properties)
val outputFormat = getOutputFormat(dbUrl, properties)

val source = env.createInput(inputFormat)
val sdp = source.getSplitDataProperties
sdp.splitsPartitionedBy(0)
sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))

// transform java DataSet to scala DataSet...
new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
  .groupBy(0, 1)
  .combineGroup(groupCombiner)
  .withForwardedFields("f0->_1")
  .groupBy(0, 1)
  .reduceGroup(groupReducer)
  .withForwardedFields("_1")
  .output(outputFormat)

It seems to work well, and the semantic annotation does remove a hash
partition from the execution plan.

Regards,
Alexis.


On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske  wrote:

> Hi Alexis,
>
> The Scala API does not expose a DataSource object but only a Scala DataSet
> which wraps the Java object.
> You can get the SplitDataProperties from the Scala DataSet as follows:
>
> val dbData: DataSet[...] = ???
> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties
>
> So you first have to get the wrapped Java DataSet, cast it to DataSource
> and then get the properties.
> It's not very nice, but should work.
>
> In order to use SDPs, you should be a bit familiar how physical data
> properties are propagated and discarded in the optimizer.
> For example, applying a simple MapFunction removes all properties because
> the function might have changed the fields on which a DataSet is
> partitioned or sorted.
> You can expose the behavior of a function to the optimizer by using
> Semantic Annotations [1]
>
> Some comments on the code and plan you shared:
> - You might want to add hostname to ORDER BY to have the output grouped by
> (ts, hostname).
> - Check the Global and Local data properties in the plan to validate that
> the SDP were correctly interpreted.
> - If the data is already correctly partitioned and sorted, you might not
> need the Combiners. In either case, you properly want to annotate them with
> Forward Field annoations.
>
> The number of source tasks is unrelated to the number of splits. If you
> have more tasks than splits, some tasks won't process any data.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations
>
>
> 2018-08-08 14:10 GMT+02:00 Alexis Sarda :
>
>> Hi Fabian,
>>
>> Thanks for the clarification. I have a few remarks, but let me provide
>> more concrete information. You can find the query I'm using, the
>> JDBCInputFormat creation, and the execution plan in this github gist:
>>
>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>>
>> I cannot call getSplitDataProperties because env.createInput(inputFormat)
>> returns a DataSet, not a DataSource. In the code, I do this instead:
>>
>> val javaEnv =
>> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment
>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
>> "example")
>>
>> which feels wrong (the constructor doesn't accept a Scala environment).
>> Is there a better alternative?
>>
>> I see absolutely no difference in the execution plan whether I use SDP or
>> not, so therefore the results are indeed the same. Is this expected?
>>
>> My ParameterValuesProvider specifies 2 splits, yet the execution plan
>> shows Parallelism=24. Even the source code is a bit ambiguous, considering
>> that the constructor for GenericInputSplit takes two parameters:
>> partitionNumber and totalNumberOfPartitions. Should I assume that there are
>> 2 splits divided into 24 partitions?
>>
>> Regards,
>> Alexis.
>>
>>
>>
>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske  wrote:
>>
>>> Hi Alexis,
>>>
>>> First of all, I think you leverage the partitioning and sorting
>>> properties of the data returned by the database using SplitDataProperties.
>>> However, please be aware that SplitDataProperties are a rather
>>> experimental feature.
>>>
>>> If used without query parameters, the JDBCInputFormat generates a single
>>> split and queries the database just once. If you want to leverage
>>> parallelism, you have to specify a query with parameters in the WHERE
>>> clause to read different parts of the table.
>>> Note, depending on the configuration of the database, multiple queries
>>> result in multiple full scans. Hence, it might make sense to have an index
>>> on the partitioning columns.
>>>
>>> If properly configured, the JDBCInputFormat generates multiple splits
>>> which are partitioned. Since the partitioning is encoded in the query, it
>>> is opaque to Flink and must be explicitly declared.
>>> This can be done with SDPs. The SDP.splitsPartitionedBy() 

Re: Using sensitive configuration/credentials

2018-08-09 Thread vino yang
Hi Chesnay,

Oh, I did not know this feature. Any more description in Flink official
documentation?

Thanks, vino.

Chesnay Schepler  于2018年8月9日周四 下午4:29写道:

> If you change the name of your configuration key ti include "secret" or
> "password" it should be hidden from the logs and UI.
>
> On 09.08.2018 04:28, vino yang wrote:
>
> Hi Matt,
>
> Flink is currently enhancing its security, such as the current data
> transmission can be configured with SSL mode[1].
> However, some problems involving configuration and web ui display do
> exist, and they are still displayed in plain text.
> I think a temporary way to do this is to keep your secret configuration in
> encrypted form elsewhere, such as Zookeeper or RDBMS, and then dynamically
> read it into the job in a UDF (in the open method).
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/security-ssl.html
>
> Thanks, vino.
>
> Matt Moore  于2018年8月9日周四 上午1:54写道:
>
>> I'm wondering what the best practice is for using secrets in a Flink
>> program, and I can't find any info in the docs or posted anywhere else.
>>
>> I need to store an access token to one of my APIs for flink to use to
>> dump results into, and right now I'm passing it through as a configuration
>> parameter, but that doesn't seem like the most secure thing to do and the
>> value shows up in the Flink Dashboard under Configuration which is less
>> than ideal.
>>
>> Has anyone else dealt with a situation like this?
>>
>> Thanks,
>>
>>
>


Window state with rocksdb backend

2018-08-09 Thread 祁明良
Hi all,


This is mingliang, I got a problem with rocksdb backend.


I'm currently using a 15min SessionWindow which also fires every 10s, there's 
no pre-aggregation, so the input of WindowFunction would be the whole Iterator 
of input object.

For window operator, I assume this collection is also a state that maintained 
by Flink.

Then, in each 10s fire, the window function will take the objects out from 
iterator and do some update, and in next fire, I assume I would get the updated 
value of that object.

With File system backend it was successful but eats a lot of memory and finally 
I got GC overhead limit, then I switch to rocksdb backend and the problem is 
the object in the next fire round is not updated by the previous fire round.

Do I have to do some additional staff with rocksdb backend in this case?


Thanks in advance

Mingliang


本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


Re: Using sensitive configuration/credentials

2018-08-09 Thread Chesnay Schepler
If you change the name of your configuration key ti include "secret" or 
"password" it should be hidden from the logs and UI.


On 09.08.2018 04:28, vino yang wrote:

Hi Matt,

Flink is currently enhancing its security, such as the current data 
transmission can be configured with SSL mode[1].
However, some problems involving configuration and web ui display do 
exist, and they are still displayed in plain text.
I think a temporary way to do this is to keep your secret 
configuration in encrypted form elsewhere, such as Zookeeper or RDBMS, 
and then dynamically read it into the job in a UDF (in the open method).


https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/security-ssl.html

Thanks, vino.

Matt Moore mailto:m...@mattdoescode.com>> 
于2018年8月9日周四 上午1:54写道:


I'm wondering what the best practice is for using secrets in a
Flink program, and I can't find any info in the docs or posted
anywhere else.

I need to store an access token to one of my APIs for flink to use
to dump results into, and right now I'm passing it through as a
configuration parameter, but that doesn't seem like the most
secure thing to do and the value shows up in the Flink Dashboard
under Configuration which is less than ideal.

Has anyone else dealt with a situation like this?

Thanks,





Re: JDBCInputFormat and SplitDataProperties

2018-08-09 Thread Fabian Hueske
Hi Alexis,

The Scala API does not expose a DataSource object but only a Scala DataSet
which wraps the Java object.
You can get the SplitDataProperties from the Scala DataSet as follows:

val dbData: DataSet[...] = ???
val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties

So you first have to get the wrapped Java DataSet, cast it to DataSource
and then get the properties.
It's not very nice, but should work.

In order to use SDPs, you should be a bit familiar how physical data
properties are propagated and discarded in the optimizer.
For example, applying a simple MapFunction removes all properties because
the function might have changed the fields on which a DataSet is
partitioned or sorted.
You can expose the behavior of a function to the optimizer by using
Semantic Annotations [1]

Some comments on the code and plan you shared:
- You might want to add hostname to ORDER BY to have the output grouped by
(ts, hostname).
- Check the Global and Local data properties in the plan to validate that
the SDP were correctly interpreted.
- If the data is already correctly partitioned and sorted, you might not
need the Combiners. In either case, you properly want to annotate them with
Forward Field annoations.

The number of source tasks is unrelated to the number of splits. If you
have more tasks than splits, some tasks won't process any data.

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#
semantic-annotations


2018-08-08 14:10 GMT+02:00 Alexis Sarda :

> Hi Fabian,
>
> Thanks for the clarification. I have a few remarks, but let me provide
> more concrete information. You can find the query I'm using, the
> JDBCInputFormat creation, and the execution plan in this github gist:
>
> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>
> I cannot call getSplitDataProperties because env.createInput(inputFormat)
> returns a DataSet, not a DataSource. In the code, I do this instead:
>
> val javaEnv = org.apache.flink.api.java.ExecutionEnvironment.getExecutionE
> nvironment
> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
> "example")
>
> which feels wrong (the constructor doesn't accept a Scala environment). Is
> there a better alternative?
>
> I see absolutely no difference in the execution plan whether I use SDP or
> not, so therefore the results are indeed the same. Is this expected?
>
> My ParameterValuesProvider specifies 2 splits, yet the execution plan
> shows Parallelism=24. Even the source code is a bit ambiguous, considering
> that the constructor for GenericInputSplit takes two parameters:
> partitionNumber and totalNumberOfPartitions. Should I assume that there are
> 2 splits divided into 24 partitions?
>
> Regards,
> Alexis.
>
>
>
> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske  wrote:
>
>> Hi Alexis,
>>
>> First of all, I think you leverage the partitioning and sorting
>> properties of the data returned by the database using SplitDataProperties.
>> However, please be aware that SplitDataProperties are a rather
>> experimental feature.
>>
>> If used without query parameters, the JDBCInputFormat generates a single
>> split and queries the database just once. If you want to leverage
>> parallelism, you have to specify a query with parameters in the WHERE
>> clause to read different parts of the table.
>> Note, depending on the configuration of the database, multiple queries
>> result in multiple full scans. Hence, it might make sense to have an index
>> on the partitioning columns.
>>
>> If properly configured, the JDBCInputFormat generates multiple splits
>> which are partitioned. Since the partitioning is encoded in the query, it
>> is opaque to Flink and must be explicitly declared.
>> This can be done with SDPs. The SDP.splitsPartitionedBy() method tells
>> Flink that all records with the same value in the partitioning field are
>> read from the same split, i.e, the full data is partitioned on the
>> attribute across splits.
>> The same can be done for ordering if the queries of the JDBCInputFormat
>> is specified with an ORDER BY clause.
>> Partitioning and grouping are two different things. You can define a
>> query that partitions on hostname and orders by hostname and timestamp and
>> declare these properties in the SDP.
>>
>> You can get a SDP object by calling DataSource.getSplitDataProperties().
>> In your example this would be source.getSplitDataProperties().
>>
>> Whatever you do, you should carefully check the execution plan
>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1]
>> and validate that the result are identical whether you use SDP or not.
>>
>> Best, Fabian
>>
>> [1] https://flink.apache.org/visualizer/
>>
>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda :
>>
>>> Hi everyone,
>>>
>>> I have the following scenario: I have a database table with 3 columns: a
>>> host (string), a timestamp, and an integer ID. Conceptually, what I'd like
>>> to do is:
>>>
>>> group by host and 

Re: UTF-16 support for TextInputFormat

2018-08-09 Thread Fabian Hueske
Hi David,

Did you try to set the encoding on the TextInputFormat with

TextInputFormat tif = ...
tif.setCharsetName("UTF-16");

Best, Fabian

2018-08-08 17:45 GMT+02:00 David Dreyfus :

> Hello -
>
> It does not appear that Flink supports a charset encoding of "UTF-16". It
> particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM)
> to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. Are there any
> plans to enhance Flink to handle UTF-16 with BOM?
>
> Thank you,
> David
>


Re: Could not cancel job (with savepoint) "Ask timed out"

2018-08-09 Thread vino yang
Hi Juho,

We use REST client API : triggerSavepoint(), this API returns a
CompletableFuture, then we call it's get() API.

You can understand that I am waiting for it to complete in sync.
Because cancelWithSavepoint is actually waiting for savepoint to complete
synchronization, and then execute the cancel command.

We do not use CLI. I think since you are through the CLI, you can observe
whether the savepoint is complete by combining the log or the web UI.

Thanks, vino.


Juho Autio  于2018年8月9日周四 下午3:07写道:

> Thanks for the suggestion. Is the separate savepoint triggering async?
> Would you then separately poll for the savepoint's completion before
> executing cancel? If additional polling is needed, then I would say that
> for my purpose it's still easier to call cancel with savepoint and simply
> ignore the result of the call. I would assume that it won't do any harm if
> I keep retrying cancel with savepoint until the job stops – I expect that
> an overlapping cancel request is ignored if the job is already creating a
> savepoint. Please correct if my assumption is wrong.
>
> On Thu, Aug 9, 2018 at 5:04 AM vino yang  wrote:
>
>> Hi Juho,
>>
>> This problem does exist, I suggest you separate these two steps to
>> temporarily deal with this problem:
>> 1) Trigger Savepoint separately;
>> 2) execute the cancel command;
>>
>> Hi Till, Chesnay:
>>
>> Our internal environment and multiple users on the mailing list have
>> encountered similar problems.
>>
>> In our environment, it seems that JM shows that the save point is
>> complete and JM has stopped itself, but the client will still connect to
>> the old JM and report a timeout exception.
>>
>> Thanks, vino.
>>
>>
>> Juho Autio  于2018年8月8日周三 下午9:18写道:
>>
>>> I was trying to cancel a job with savepoint, but the CLI command failed
>>> with "akka.pattern.AskTimeoutException: Ask timed out".
>>>
>>> The stack trace reveals that ask timeout is 10 seconds:
>>>
>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms].
>>> Sender[null] sent message of type
>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>>
>>> Indeed it's documented that the default value for akka.ask.timeout="10
>>> s" in
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka
>>>
>>> Behind the scenes the savepoint creation & job cancellation succeeded,
>>> that was to be expected, kind of. So my problem is just getting a proper
>>> response back from the CLI call instead of timing out so eagerly.
>>>
>>> To be exact, what I ran was:
>>>
>>> flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m
>>> yarn-cluster -yid application_1533676784032_0001 --withSavepoint
>>>
>>> Should I change the akka.ask.timeout to have a longer timeout? If yes,
>>> can I override it just for the CLI call somehow? Maybe it might have
>>> undesired side-effects if set globally for the actual flink jobs to use?
>>>
>>> What about akka.client.timeout? The default for it is also rather
>>> low: "60 s". Should it also be increased accordingly if I want to accept
>>> longer than 60 s for savepoint creation?
>>>
>>> Finally, that default timeout is so low that I would expect this to be a
>>> common problem. I would say that Flink CLI should have higher default
>>> timeout for cancel and savepoint creation ops.
>>>
>>> Thanks!
>>>
>>
>


Re: Could not cancel job (with savepoint) "Ask timed out"

2018-08-09 Thread Juho Autio
Thanks for the suggestion. Is the separate savepoint triggering async?
Would you then separately poll for the savepoint's completion before
executing cancel? If additional polling is needed, then I would say that
for my purpose it's still easier to call cancel with savepoint and simply
ignore the result of the call. I would assume that it won't do any harm if
I keep retrying cancel with savepoint until the job stops – I expect that
an overlapping cancel request is ignored if the job is already creating a
savepoint. Please correct if my assumption is wrong.

On Thu, Aug 9, 2018 at 5:04 AM vino yang  wrote:

> Hi Juho,
>
> This problem does exist, I suggest you separate these two steps to
> temporarily deal with this problem:
> 1) Trigger Savepoint separately;
> 2) execute the cancel command;
>
> Hi Till, Chesnay:
>
> Our internal environment and multiple users on the mailing list have
> encountered similar problems.
>
> In our environment, it seems that JM shows that the save point is complete
> and JM has stopped itself, but the client will still connect to the old JM
> and report a timeout exception.
>
> Thanks, vino.
>
>
> Juho Autio  于2018年8月8日周三 下午9:18写道:
>
>> I was trying to cancel a job with savepoint, but the CLI command failed
>> with "akka.pattern.AskTimeoutException: Ask timed out".
>>
>> The stack trace reveals that ask timeout is 10 seconds:
>>
>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms].
>> Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>
>> Indeed it's documented that the default value for akka.ask.timeout="10
>> s" in
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka
>>
>> Behind the scenes the savepoint creation & job cancellation succeeded,
>> that was to be expected, kind of. So my problem is just getting a proper
>> response back from the CLI call instead of timing out so eagerly.
>>
>> To be exact, what I ran was:
>>
>> flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m
>> yarn-cluster -yid application_1533676784032_0001 --withSavepoint
>>
>> Should I change the akka.ask.timeout to have a longer timeout? If yes,
>> can I override it just for the CLI call somehow? Maybe it might have
>> undesired side-effects if set globally for the actual flink jobs to use?
>>
>> What about akka.client.timeout? The default for it is also rather
>> low: "60 s". Should it also be increased accordingly if I want to accept
>> longer than 60 s for savepoint creation?
>>
>> Finally, that default timeout is so low that I would expect this to be a
>> common problem. I would say that Flink CLI should have higher default
>> timeout for cancel and savepoint creation ops.
>>
>> Thanks!
>>
>