Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-27 Thread Michael Fong
Hi, shashank agarwal



Not sure if I can answer fully your question, but after digging some code,
I am not sure if C* connector totally supports Scala case class + CQL data
mapping at the moment. I may be totally wrong, and you need to ask the
flink dev about this. However, I have some toy examples that you could
check out to see which uses CassandraScalaProductSinkBuilder + predefined
CQL query + entity. I am not using Scala case class so may not fit your
need.

The example snippet you may find @
https://github.com/mcfongtw/flink-cassandra-connector-examples/

Regards,

On Thu, Dec 28, 2017 at 1:11 PM, Michael Fong  wrote:

> Hi, shashank agarwal
>
>
> AFAIK, in java side, for a pojo data type, you don't need to set query
> since the CQL data mapping would take care of that whereas dealing with
> java tuples, you do need to provide a upsert query so that cassandra knows
> what to insert into the table.
> Scala tuple case is clear, same as java - providing a CQL query; however,
> I don't know what's up with Scala pojo case (class) though...
>
> Regards,
>
> Michael
>


Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-27 Thread Michael Fong
Hi, shashank agarwal


AFAIK, in java side, for a pojo data type, you don't need to set query
since the CQL data mapping would take care of that whereas dealing with
java tuples, you do need to provide a upsert query so that cassandra knows
what to insert into the table.
Scala tuple case is clear, same as java - providing a CQL query; however, I
don't know what's up with Scala pojo case (class) though...

Regards,

Michael


Re: org.apache.zookeeper.ClientCnxn, Client session timed out

2017-12-27 Thread Hao Sun
Thanks! Great to know I do not have to worry duplicates inside Flink.

One more question, why this happens? Because TM and JM both check
leadership in different interval?
> The TM noticed the loss of leadership before the JM did.

On Wed, Dec 27, 2017, 13:52 Ufuk Celebi  wrote:

> On Wed, Dec 27, 2017 at 4:41 PM, Hao Sun  wrote:
>
>> Somehow TM detected JM leadership loss from ZK and self disconnected?
>> And couple of seconds later, JM failed to connect to ZK?
>>
>
> Yes, exactly as you describe. The TM noticed the loss of leadership before
> the JM did.
>
>
>> After all the cluster recovered nicely by its own, but I am wondering
>> does this break the exactly-once semantics? If yes, what should I take care?
>>
>
> Great :-) It does not break exactly-once guarantees *within* the Flink
> pipeline as the state of the latest completed checkpoint will be restored
> after recovery. This rewinds your job and might result in duplicate or
> changed output if you don't use an exactly once or idempotent sink.
>
> – Ufuk
>
>


Re: org.apache.zookeeper.ClientCnxn, Client session timed out

2017-12-27 Thread Ufuk Celebi
On Wed, Dec 27, 2017 at 4:41 PM, Hao Sun  wrote:

> Somehow TM detected JM leadership loss from ZK and self disconnected?
> And couple of seconds later, JM failed to connect to ZK?
>

Yes, exactly as you describe. The TM noticed the loss of leadership before
the JM did.


> After all the cluster recovered nicely by its own, but I am wondering does
> this break the exactly-once semantics? If yes, what should I take care?
>

Great :-) It does not break exactly-once guarantees *within* the Flink
pipeline as the state of the latest completed checkpoint will be restored
after recovery. This rewinds your job and might result in duplicate or
changed output if you don't use an exactly once or idempotent sink.

– Ufuk


org.apache.zookeeper.ClientCnxn, Client session timed out

2017-12-27 Thread Hao Sun
Hi I need some help to figure out the root cause of this error.
I am running flink 1.3.2 on K8S.

My cluster has been up and running for almost two weeks and all of a sudden
I see this familiar error again, my task manager is killed/lost. There are
many ways cause this error, I need help to figure out what is the root
cause this time.

>From JM.log

*2017-12-26 14:57:08,624* INFO org.apache.zookeeper.ClientCnxn - Client
session timed out, have not heard from server in 85001ms for sessionid
0x25ddcdec0ef77af, closing socket connection and attempting reconnect
2017-12-26 14:57:23,621 WARN akka.remote.RemoteWatcher - Detected
unreachable: [akka.tcp://flink@fps-flink-taskmanager-960711320-vx0hj:39249]
2017-12-26 14:57:23,623 INFO org.apache.flink.runtime.jobmanager.JobManager
- Task manager 
akka.tcp://flink@fps-flink-taskmanager-960711320-vx0hj:39249/user/taskmanager
terminated.
2017-12-26 14:57:23,624 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source:
KafkaSource(maxwell.users) -> MaxwellFilter->Maxwell(maxwell.users) ->
FixedDelayWatermark(maxwell.users) ->
MaxwellFPSEvent->InfluxDBData(maxwell.users) -> Sink:
influxdbSink(maxwell.users) (1/1) (ddca953ae90906daaae08791e1fde729)
switched from RUNNING to FAILED.
java.lang.Exception: TaskManager was lost/killed:
e14186b8fd22699210273f887570a172 @ fps-flink-taskmanager-960711320-vx0hj
(dataPort=37353)


>From TM.log
*2017-12-26 14:56:26,019 INFO* org.apache.flink.runtime.taskmanager.Task -
Source: KafkaSource(maxwell.tickets) ->
MaxwellFilter->Maxwell(maxwell.tickets) ->
FixedDelayWatermark(maxwell.tickets) ->
MaxwellFPSEvent->InfluxDBData(maxwell.tickets) -> Sink:
influxdbSink(maxwell.tickets) (1/1) (44b4caf2010bb2b061b67d4f6c8dbc3f)
switched from RUNNING to FAILED.java.lang.Exception: TaskManager
akka://flink/user/taskmanager disconnects from JobManager
akka.tcp://flink@fps-flink-jobmanager:6123/user/jobmanager: *Old JobManager
lost its leadership.*

Somehow TM detected JM leadership loss from ZK and self disconnected?
And couple of seconds later, JM failed to connect to ZK?

After all the cluster recovered nicely by its own, but I am wondering does
this break the exactly-once semantics? If yes, what should I take care?

Thanks team!

Full log:
https://gist.github.com/zenhao/e2f9b929f4eaee32f99948d462db7359


Re: MergingWindow

2017-12-27 Thread Ufuk Celebi
Please check your email before sending it the next time as three
emails for the same message is a little spammy ;-)

This is internal code that is used to implement session windows as far
as I can tell. The idea is to not merge the new window as it never had
any state associated with it. The general idea of merging windows is
to keep one of the original windows as the state window, i.e. the
window that is used as namespace to store the window elements.
Elements from the state windows of merged windows must be merged into
this one state window.

For more details, this should be directed to the dev mailing list.

– Ufuk

On Tue, Dec 26, 2017 at 4:58 AM, aitozi  wrote:
> Hi,
>
> i cant unserstand usage of this snippest of the code in
> MergingWindowSet.java, can anyone explain this for me ?
>
>
> if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
> mergeFunction.merge(mergeResult,
> mergedWindows,
> this.mapping.get(mergeResult),
> mergedStateWindows);
> }
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Apache Flink - broadcasting DataStream

2017-12-27 Thread Ufuk Celebi
Hey Mans!

This refers to how sub tasks are connected to each other in your
program. If you have a single sub task A1 and three sub tasks B1, B2,
B3, broadcast will emit each incoming record at A1 to all B1, B2, B3:

A1 --+-> B1
 +-> B2
 +-> B3

Does this help?

On Mon, Dec 25, 2017 at 7:12 PM, M Singh  wrote:
> 1 What elements get broad to the partitions ?

Each incoming element is broadcasted

> 2. What happens as new elements are added to the stream ? Are only the new
> elements broadcast ?

Yes, each incoming element is broadcasted separately without any history.

> 3. Since the broadcast operation returns a DataStream can it be used in join
> how do new (and old) elements affect the join results ?

Yes, should be. Every element is broadcasted only once.

> 4. Similarly how does broadcast work with connected streams ?

Similar to non connected streams. The incoming records are emitted to
every downstream partition.

– Ufuk


Re: flink yarn-cluster run job --files

2017-12-27 Thread Ufuk Celebi
The file URL needs to be accessible from all nodes, e.g. something
like S3://... or hdfs://...

>From the CLI:

```
Adds a URL to each user code classloader  on all nodes in the cluster.
The paths must specify a protocol (e.g. file://) and be accessible on
all nodes (e.g. by means of a NFS share). You can use this option
multiple times for specifying more than one URL. The protocol must be
supported by the {@link java.net.URLClassLoader}.
```

Is this the case?

I don't know whether this would work to access any file you provide though...



On Mon, Dec 25, 2017 at 2:13 PM,   wrote:
> Hi,all
>
> in spark,the submit job can have --files,this means" Comma-separated list of
> files to be placed in the working directory of each executor."
>
> so,in flink,if there have the same method,i use --classpath file:///,but
> the job run error,there has not the file.
>


Re: Fetching TaskManager log failed

2017-12-27 Thread Ufuk Celebi
Thanks for reporting this issue. A few questions:

- Which version of Flink are you using?
- Does it work up to the point that the Exception is thrown? e.g. for
smaller files it's OK?

Let me pull in Chesnay (cc'd) who has worked on log fetching for the
web runtime.

– Ufuk


On Tue, Dec 26, 2017 at 8:50 AM,   wrote:
> I run a flink job,when run one hour,there have a error:
>
>  ERROR org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler  -
> Fetching TaskManager log failed.
> java.util.NoSuchElementException: None.get
>


Re: How to stop FlinkKafkaConsumer and make job finished?

2017-12-27 Thread Ufuk Celebi
Hey Jaxon,

I don't think it's possible to control this via the life-cycle methods
of your functions.

Note that Flink currently does not support graceful stop in a
meaningful manner and you can only cancel running jobs. What comes to
my mind to cancel on EOF:

1) Extend Kafka consumer to stop emitting records after your EOF
record. Look at the flink-connector-kafka-base module. This is
probably not feasible and some work to get familiar with the code.
Just putting in out there.

2) Throw a "SuccessException" that fails the job. Easy, but not nice.

3) Use an Http client and cancel your job via the Http endpoint
(https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
Easy, but not nice, since you need quite some logic in your function
(e.g. ignore records after EOF record until cancellation, etc.).

Maybe Aljoscha (cc'd) has an idea how to do this in a better way.

– Ufuk


On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu  wrote:
> I would like to stop FlinkKafkaConsumer consuming data from kafka manually.
> But I find it won't be close when I invoke "cancel()" method. What I am
> trying to do is add an EOF symbol meaning the end of my kafka data, and when
> the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer
> "cancel()" method. It doesn't work. Flink streaming job won't finish unless
> it get canceled or failed, when I use kafka as source.
>
> Somebody knowing  gives me some help, thx~~


Re: Flink network access control documentation

2017-12-27 Thread Ufuk Celebi
Hey Elias,

thanks for opening a ticket (for reference:
https://issues.apache.org/jira/browse/FLINK-8311). I fully agree with
adding docs for this. I will try to write something down this week.

Your point about JobManagers only coordinating via ZK is correct
though. I had a look into the JobManager code (as of 1.4) and the
leader election service only reads and writes leader information into
ZK which is then picked up by the TaskManagers.

What you are seeing here is related to the web UI which is attached to
every JM. The UI tries to connect to the leading JM in order to access
runtime information of the leading JM. This is not documented anywhere
as far as I can tell and might have changed between 1.3 and 1.4. The
port should not be critical to the functioning of your Flink cluster,
but only for accessing the web UI on a non-leading JM.

– Ufuk


On Fri, Dec 22, 2017 at 8:36 PM, Elias Levy  wrote:
> There is a need for better documentation on what connects to what over which
> ports in a Flink cluster to allow users to configure network access control
> rules.
>
> I was under the impression that in a ZK HA configuration the Job Managers
> were essentially independent and only coordinated via ZK.  But starting
> multiple JMs in HA with the JM RPC port blocked between JMs shows that the
> second JM's Akka subsystem is trying to connect to the leading JM:
>
> INFO  akka.remote.transport.ProtocolStateActor  - No
> response from remote for outbound association. Associate timed out after
> [2 ms].
> WARN  akka.remote.ReliableDeliverySupervisor-
> Association with remote system [akka.tcp://flink@10.210.210.127:6123] has
> failed, address is now gated for [5000] ms. Reason: [Association failed with
> [akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from remote
> for outbound association. Associate timed out after [2 ms].]
> WARN  akka.remote.transport.netty.NettyTransport- Remote
> connection to [null] failed with
> org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException:
> connection timed out: /10.210.210.127:6123
>