Re: Broadcast Config through Connected Stream

2017-09-25 Thread Navneeth Krishnan
Thanks a lot Aljoscha. That helps.

On Mon, Sep 25, 2017 at 4:47 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> I think this is a valid approach, you can even use "operator state" in
> your map function to make the broadcast config state stateful.
>
> Another approach would be to use internal APIs to hack an operator that
> has a keyed stream on one input and a broadcast stream on the second input.
> You can see that approach in action in the Beam Flink Runner [1] but I
> would strongly recommend against doing that because it is using internal
> APIs and if the other approach works for you I would stay with that.
>
> Best,
> Aljoscha
>
> [1] https://github.com/apache/beam/blob/be9fb29901cf4a1ae7b4a9d8e9f25f
> 4ea78359fd/runners/flink/src/main/java/org/apache/beam/runners/flink/
> FlinkStreamingTransformTranslators.java#L488
>
> On 15. Sep 2017, at 07:04, Navneeth Krishnan 
> wrote:
>
> Hi,
>
> Any suggestions on this could be achieved?
>
> Thanks
>
> On Thu, Sep 7, 2017 at 8:02 AM, Navneeth Krishnan <
> reachnavnee...@gmail.com> wrote:
>
>> Hi All,
>>
>> Any suggestions on this would really help.
>>
>> Thanks.
>>
>> On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan <
>> reachnavnee...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I looked into an earlier email about the topic broadcast config through
>>> connected stream and I couldn't find the conclusion.
>>>
>>> I can't do the below approach since I need the config to be published to
>>> all operator instances but I need keyed state for external querying.
>>>
>>> streamToBeConfigured.connect(configMessageStream)
>>> .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
>>> .flatMap(new FunctionWithConfigurableState())
>>> .addSink(...);
>>>
>>> One of the resolution I found in that mail chain was below. I can use
>>> this to solve my issue but is this still the recommended approach?
>>>
>>> stream1.connect(stream2)
>>> .map(new MergeStreamsMapFunction()) // Holds transient state
>>> of the last ConfigMessage and maps Stream1's data to a Tuple2>> ConfigMessage>
>>> .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow
>>> for ValueStateDescriptors and semantically correct partitioning according
>>> to business logic
>>> .flatMap(new StatefulFlatMapFunction()) // Save latest
>>> received ConfigMessage-Value in ValueStateDescriptor here
>>> .addSink(...);
>>>
>>> Thanks,
>>> Navneeth
>>>
>>
>>
>
>


Flink on EMR

2017-09-25 Thread Navneeth Krishnan
Hello All,

I'm trying to deploy flink on AWS EMR and I'm very new to EMR. I'm running
into multiple issues and need some help.

*Issue1:*

How did others resolve this multiple bindings issue?


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/11/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/12/location-compute-1.0-SNAPSHOT-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]


*Issue2:*

Running the below command runs the pipeline but the task manager is
allocated with only 5GB memory instead of 8GB memory. Any reason why?
flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 8192 ./my-pipeline.jar


*Issue3:*

How to provide the checkpoint directory? By just providing this
"hdfs:///checkpoints/" will it work or should I provide any master
node host name?


*Issue 4:*

How can I get the task manager logs? Should I use log aggregation in
hadoop yarn or send it to cloud watch?


Also if there any best practices to be used while running flink on
yarn, please let me know.


Thanks a lot.


Regards,

Navneeth


Questions about checkpoints/savepoints

2017-09-25 Thread vipul singh
Hello,

I have some confusion about checkpoints vs savepoints, and how to use them
effectively in my application.

I am working on an application which is relies on flink's fault tolerant
mechanism to ensure exactly once semantics. I have enabled external
checkpointing in my application as below:

env.enableCheckpointing(CHECKPOINT_TIME_MS)

env.setStateBackend(new RocksDBStateBackend(CHECKPOINT_LOCATION))

env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)

Please correct me incase I am wrong but the above ensures if the
application crashes, it is able to recover from the last know location.
This however wont work if we cancel the application( for new
deployments/restarts).

Reading link 
about
savepoints, hints that it should a good practice to have savepoints at
regular intervals of time(by crons

etc) so that the application can be restarted from a last known location.
This also points to using command line option( -s ) to cancel an
application, so that the application stops after saving a savepoint. Based
on the above understanding I have some questions below:

Questions:

   1. It seems to me that checkpoints can be treated as flink internal
   recovery mechanism, and savepoints act more as user-defined recovery
   points. Would that be a correct assumption?
   2. While cancelling an application with -s option, it specifies the
   savepoint location. Is there a way during application startup to identify
   the last know savepoint from a folder by itself, and restart from there.
   Since I am saving my savepoints on s3, I want to avoid issues arising from
   *ls* command on s3 due to read-after-write consistency of s3.
   3. Suppose my application has a checkpoint at point t1, and say i cancel
   this application sometime in future before the next available checkpoint(
   say t1+x). If I start the application without specifying the savepoint, it
   will start from the last known checkpoint(at t1), which wont have the
   application state saved, since I had cancelled the application. Would this
   is a correct assumption?
   4. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be
   same as manually saving regular savepoints?


Please let me know.

Thanks,
Vipul


java.lang.OutOfMemoryError: Java heap space at com.google.protobuf.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62)

2017-09-25 Thread sohimankotia
Hi,

I am getting Java Heap Space error while running Flink Job (Flink 1.2 ) .

Use case :  I am getting all keys from REDIS with specific pattern . Then
streaming over those keys and reading data from Redis for those key and
writing to file in HDFS .

Job was running fine for few days but suddenly started giving Heap Space
error .

Job Parallelism : 9 ( 3 nodes and 3 slots) 

Total Redis Keys  : 1031966  ( aprox 33 MB in size (keys only))



Logs : 







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


FIP-6: Job specific Docker images status?

2017-09-25 Thread Elias Levy
I was wondering what is the status of support for job specific Docker
images, meaning images that combine the job jars with the job manager, do
not require job submission, and automatically execute the job when there
are enough task managers registered with the job manager to satisfy the
job's parallelism.

I could not tell as the FLINK-4319 FIP-6 umbrella issue does not seem to
have an entry for it.


CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured.

2017-09-25 Thread Hao Sun
Hi I am running flink in dev mode through Intellij, I have flink-conf.yaml
correctly configured and from the log you can see job manager is reading it.

2017-09-25 20:41:52.255 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - *Loading
configuration property: state.backend, rocksdb*
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: state.backend.fs.checkpointdir,
/tmp/flink/checkpoints/
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - *Loading
configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/*
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: state.savepoints.dir, /tmp/flink/savepoints/

*But I still somehow get this error*
java.lang.IllegalStateException: CheckpointConfig says to persist periodic
checkpoints, but no checkpoint directory has been configured. You can
configure configure one via key 'state.checkpoints.dir'.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:209)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


*My program only has this related to checkpointing*

val env = StreamExecutionEnvironment.*getExecutionEnvironment
*env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)env.enableCheckpointing(2
* 60 * 1000)


Need some help to dig through this. Thanks

=== Full log =

2017-09-25 20:41:51.466 [ForkJoinPool-1-worker-13] INFO
 com.zendesk.consul.Consul  - Collecting kafka nodes from
Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:51.946 [main] INFO
 org.apache.flink.api.java.typeutils.TypeExtractor  - class
com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents does not
contain a setter for field events
2017-09-25 20:41:51.946 [main] INFO
 org.apache.flink.api.java.typeutils.TypeExtractor  - class
com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents is not a valid
POJO type because not all fields are valid POJO fields.
2017-09-25 20:41:51.985 [main] INFO
 org.apache.flink.api.java.typeutils.TypeExtractor  - class
com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent does not contain a
setter for field accountId
2017-09-25 20:41:51.985 [main] INFO
 org.apache.flink.api.java.typeutils.TypeExtractor  - class
com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent is not a valid POJO
type because not all fields are valid POJO fields.
2017-09-25 20:41:52.017 [ForkJoinPool-1-worker-13] INFO
 com.zendesk.consul.Consul  - Collecting kafka nodes from
Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:52.198 [main] INFO
 o.a.flink.streaming.api.environment.LocalStreamEnvironment  - Running job
on local embedded Flink mini cluster
2017-09-25 20:41:52.253 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: jobmanager.rpc.address, localhost
2017-09-25 20:41:52.255 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: jobmanager.rpc.port, 6123
2017-09-25 20:41:52.255 [main] INFO
 

Re: high-availability.jobmanager.port vs jobmanager.rpc.port

2017-09-25 Thread Elias Levy
Why a range instead of just a single port in HA mode?

On Mon, Sep 25, 2017 at 1:49 PM, Till Rohrmann  wrote:

> Yes, with Flip-6 it will most likely look like how Stephan described it.
> We need the explicit port in standalone mode so that TMs can connect to the
> JM. In the other deployment scenarios, the port can be randomly picked
> unless you want to specify a port range, e.g. for firewall configuration
> purposes.
>


How to clear registered timers for a merged window?

2017-09-25 Thread Yan Zhou [FDS Science] ­
Hi,

I am implementing a merge-able trigger, and having a problem in clearing
the registered timers for a merged window (a window has been merged into
the merging result). For my implementation, the trigger registers multiple
timers for each element at Trigger#onElement(). State is used to keep track
of the registered event time, so that timer can be removed at
Trigger#clear() later.

However, clearing the registered timers in this way doesn't work if the
window has been merged. The state of origin window is removed during
merging. Method AbstractHeapMergingState#mergeNamespaces() removes the
state of merged window. I think the ContinuousEventTimeTrigger shipped with
flink would have same issue.

My question is is there a way to keep the state for a merged window? One
way I can think of is to implement a custom heap state that add the state
back in AbstractHeapMergingState#mergeState() method. Or is there a way to
clear the timers without using state? Can I twist the internal timer's
source code to expose a method to remove all timers for a specified window?

Please advise and thank you for your help.

Best
Yan


Re: high-availability.jobmanager.port vs jobmanager.rpc.port

2017-09-25 Thread Till Rohrmann
Yes, with Flip-6 it will most likely look like how Stephan described it. We
need the explicit port in standalone mode so that TMs can connect to the
JM. In the other deployment scenarios, the port can be randomly picked
unless you want to specify a port range, e.g. for firewall configuration
purposes.

However, if you look at it closely, then it is mainly a renaming of the
existing configuration parameters: jobmanager.rpc.port ->
standalone.jobmanager.rpc.port and high-availability.jobmanager.port ->
jobmanager.rpc.ports
Cheers,
Till
​

On Mon, Sep 25, 2017 at 3:42 PM, Stephan Ewen  wrote:

> /cc Till for real this time ;-)
>
> Hi!
>
> I think that can probably be simplified in the FLIP-6 case:
>
>   - All RPC is only between JM and TM and the port should be completely
> random (optionally within a range). TM and JM discover each other via HA
> (ZK) or the TM gets the JM RPC port as a parameter when the container is
> started.
>   (Parameter should be something like 'jobmanager.rpc.ports: 5-51000')
>
>   - An exception is the standalone non-HA case, because there is no
> service-discovery mechanism. That should probably be the a config key like
> 'standalone.jobmanager.rpc.port: 6123'
>
>   - The client calls come via HTTP/REST and should have one specific port
> that may optionally be discovered/redirected via YARN or the dispatchers.
>
> /cc Till for your thoughts
>
> Best,
> Stephan
>
>
> On Mon, Sep 25, 2017 at 3:31 PM, Nico Kruber 
> wrote:
>
>> Hi Elias,
>> indeed that looks strange but was introduced with FLINK-3172 [1] with an
>> argument about using the same configuration key (as opposed to having two
>> different keys as mentioned) starting at
>> https://issues.apache.org/jira/browse/FLINK-3172?
>> focusedCommentId=15091940#comment-15091940
>>
>>
>> Nico
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-3172
>>
>> On Sunday, 24 September 2017 03:04:51 CEST Elias Levy wrote:
>> > I am wondering why HA mode there is a need for a separate config
>> parameter
>> > to set the JM RPC port (high-availability.jobmanager.port) and why this
>> > parameter accepts a range, unlike jobmanager.rpc.port.
>>
>>
>
>


Flink Job on Docker on Mesos cluster

2017-09-25 Thread Rahul Raj
Hi All,

I am working on a project which involves running flink jobs on docker
containers on Mesos cluster. But I am failing to understand how docker&
flink will work on Mesos cluster. Can anyone explain me how Flink cluster
will run on docker containers on Mesos?

 If I create a docker image which will run my flink application then how it
will launch the other task managers for flink cluster and utilize them? If
anyone can give me an example to follow as a reference then it would be
good.

Rahul Raj


Re: Can i use Avro serializer as default instead of kryo serializer in Flink for scala API ?

2017-09-25 Thread Nico Kruber
Hi Shashank,
enabling Avro as the default de/serializer for Flink should be as simple as 
the following, according to [1]

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableForceAvro()

I am, however, no expert on this and the implications regarding the use of 
Avro from inside Scala, so I included Gordon (cc'd) who may know more.



Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/
types_serialization.html

On Saturday, 23 September 2017 10:11:28 CEST shashank agarwal wrote:
> Hello Team,
> 
> As our schema evolves due to business logics. We want to use expendable
> schema like Avro as default serializer and deserializer for flink program
> and states.
> 
> My doubt is, We are using Scala API in our flink program, But Avro default
> supports Java POJO. So how we can use this in our scala APi should we have
> to use serializer like Avro4s ? Or we can use default Avro in our Scala
> flink app than what will be the steps ?
> 
> Please guide.




Re: Using HiveBolt from storm-hive with Flink-Storm compatibility wrapper

2017-09-25 Thread Timo Walther

Hi Federico,

I think going through a Storm compatibility layer could work, but did 
you thought about using the flink-jdbc connector? That should be the 
easiest solution.


Otherwise I think it would be easier to quickly implement your our 
SinkFunction. It is just one method that you have to implement, you 
could call some Hive commands there.


Regards,
Timo


Am 9/25/17 um 4:16 PM schrieb Nico Kruber:

Hi Federico,
I also did not find any implementation of a hive sink, nor much details on this
topic in general. Let me forward this to Timo and Fabian (cc'd) who may know
more.

Nico

On Friday, 22 September 2017 12:14:32 CEST Federico D'Ambrosio wrote:

Hello everyone,

I'd like to use the HiveBolt from storm-hive inside a flink job using the
Flink-Storm compatibility layer but I'm not sure how to integrate it. Let
me explain, I would have the following:

val mapper = ...

val hiveOptions = ...

streamByID
   .transform[OUT]("hive-sink", new BoltWrapper[IN, OUT](new
HiveBolt(hiveOptions)))

where streamByID is a DataStream[Event].

What would be the IN and OUT types? HiveBolt executes on a storm Tuple, so,
I'd think that In should be an Event "tuple-d" ( event => (field1, field2,
field3 ...) ), while OUT, since I don't want the stream to keep flowing
would be null or None?

Alternatively, do you know any implementation of an hive sink in Flink?
Other than the adaptation of the said HiveBolt in a RichSinkFunction?

Thanks for your attention,
  Federico





Re: Building scala examples

2017-09-25 Thread Nico Kruber
Hi Michael,
from what I see, Java and Scala examples reside in different packages, e.g.
* org.apache.flink.streaming.scala.examples.async.AsyncIOExample vs.
* org.apache.flink.streaming.examples.async.AsyncIOExample

A quick run on the Flink 1.3. branch revealed flink-examples-
streaming_2.10-1.3-SNAPSHOT.jar containing both (which you can verify with 
your favorite archiver tool for zip files).

Afaik, there is no simple switch to turn off Java or Scala examples. You may 
either adapt the pom.xml or create your own Project with the examples and 
programming languages you need.


Nico


On Saturday, 23 September 2017 12:45:04 CEST Michael Fong wrote:
> Hi,
> 
> I am studying how to build a scala program from flink-examples/.
> 
> I can see there are two source folders java/ and scala/ from IntelliJ, and
> for most examples, there is a copy of examples for Java and Scala.
> Executing 'mvn clean package -Pbuild-jar' would rests in a jar file under
> target/. I am wondering if that is a Java or Scala example that I just
> compiled? In addition, is there a way to selectively choose Java o Scala
> example to build with current maven settings?
> 
> Thanks in advance,




Re: Rule expression for CEP library

2017-09-25 Thread Gábor Gévay
Hello Shailesh,

There is a Flink Improvement Proposal for Integration of SQL and CEP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-20:+Integration+of+SQL+and+CEP

Best,
Gábor


On Mon, Sep 25, 2017 at 3:21 PM, Shailesh Jain
 wrote:
> Hi,
>
> Apart from the Java/Scala API for the CEP library, is there any other way to
> express patterns/rules which can be run on flink engine?
>
> Are there any plans on adding a DSL/Rule expression language for CEP anytime
> soon? If not, any pointers on how it can be achieved now would be really
> helpful.
>
> Thanks,
> Shailesh


RE: Flink kafka consumer that read from two partitions in local mode

2017-09-25 Thread Sofer, Tovi
Hi Gordon,

Thanks for your assistance.


· We are running flink currently  in local mode(MiniCluster), using 
flink 1.3.2 and flink-connector-kafka-0.10_2.10.


· In Consumer log I see 1 partition only (when parallelism=1), so the 
problem indeed seems to be in producer.
2017-09-25 17:10:58,140 WARN  org.apache.kafka.clients.consumer.ConsumerConfig 
- [Source: fix_topic -> FixMapper (1/1)] The configuration 'topic.name' was 
supplied but isn't a known config.
2017-09-25 17:10:58,143 INFO  org.apache.kafka.common.utils.AppInfoParser - 
[Source: fix_topic -> FixMapper (1/1)] Kafka version : 0.10.2.1
2017-09-25 17:10:58,144 INFO  org.apache.kafka.common.utils.AppInfoParser - 
[Source: fix_topic -> FixMapper (1/1)] Kafka commitId : e89bffd6b2eff799
2017-09-25 17:10:58,679 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - [Source: 
fix_topic -> FixMapper (1/1)] Got 1 partitions from these topics: [fix]
2017-09-25 17:10:58,679 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - [Source: 
fix_topic -> FixMapper (1/1)] Consumer is going to read the following topics 
(with number of partitions): fix (1),
2017-09-25 17:10:58,680 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - [Source: 
fix_topic -> FixMapper (1/1)] Consumer subtask 0 will start reading the 
following 1 partitions from the committed group offsets in Kafka: 
[KafkaTopicPartition{topic='fix', partition=0}]


· The producer seems to write to one partition only.

internalProducer.topicPartitionsMap and Cluster.Partitions seems to have one 
partition for FIX topic.



In producer log each producer start with configuration below:

2017-09-25 17:06:49,596 INFO  org.apache.kafka.clients.producer.ProducerConfig- 
[Source: random -> Sink: fixTopicSink (2/2)] ProducerConfig values:

acks = 1

batch.size = 16384

block.on.buffer.full = false

bootstrap.servers = [localhost:9092]

buffer.memory = 33554432

client.id =

compression.type = none

connections.max.idle.ms = 54

interceptor.classes = null

key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

linger.ms = 0

max.block.ms = 6

max.in.flight.requests.per.connection = 5

max.request.size = 1048576

metadata.fetch.timeout.ms = 6

metadata.max.age.ms = 30

metric.reporters = []

metrics.num.samples = 2

metrics.sample.window.ms = 3

partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner

receive.buffer.bytes = 32768

reconnect.backoff.ms = 50

request.timeout.ms = 3

retries = 0

retry.backoff.ms = 100

sasl.jaas.config = null

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.min.time.before.relogin = 6

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

sasl.kerberos.ticket.renew.window.factor = 0.8

sasl.mechanism = GSSAPI

security.protocol = PLAINTEXT

send.buffer.bytes = 131072

ssl….

timeout.ms = 3

value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

And print when starting:

2017-09-25 17:07:46,907 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase- [Source: 
random -> Sink: fixTopicSink (2/2)] Starting FlinkKafkaProducer (2/2) to 
produce into default topic fix


Thanks,
Tovi

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: יום ב 25 ספטמבר 2017 15:06
To: Sofer, Tovi [ICG-IT]; Fabian Hueske
Cc: user
Subject: RE: Flink kafka consumer that read from two partitions in local mode

Hi Tovi,

Your code seems to be correct, and as Fabian described, you don’t need 
parallelism of 2 to read 2 partitions; a single parallel instance of the source 
can read multiple partitions.

I’m not sure what could have possibly gone wrong at the moment from a first 
look, so I may need to randomly ask you some questions:

Could you let me know which Flink version you are on?
Also, could you try searching in the log to see if you find consumer logs such 
as:
“Consumer subtask ... will start reading the following (numPartitions) 
partitions from ...: (partition info) "
You can try setting parallelism of the source to 1, and you should see that the 
subtask is reading 2 partitions.

From the metrics log it does seem like the consumer has picked up both 
partitions 0 and 1, but no records seem to be coming from partition 0. Have you 
perhaps tried using a 

Re: Using HiveBolt from storm-hive with Flink-Storm compatibility wrapper

2017-09-25 Thread Nico Kruber
Hi Federico,
I also did not find any implementation of a hive sink, nor much details on this 
topic in general. Let me forward this to Timo and Fabian (cc'd) who may know 
more.

Nico

On Friday, 22 September 2017 12:14:32 CEST Federico D'Ambrosio wrote:
> Hello everyone,
> 
> I'd like to use the HiveBolt from storm-hive inside a flink job using the
> Flink-Storm compatibility layer but I'm not sure how to integrate it. Let
> me explain, I would have the following:
> 
> val mapper = ...
> 
> val hiveOptions = ...
> 
> streamByID
>   .transform[OUT]("hive-sink", new BoltWrapper[IN, OUT](new
> HiveBolt(hiveOptions)))
> 
> where streamByID is a DataStream[Event].
> 
> What would be the IN and OUT types? HiveBolt executes on a storm Tuple, so,
> I'd think that In should be an Event "tuple-d" ( event => (field1, field2,
> field3 ...) ), while OUT, since I don't want the stream to keep flowing
> would be null or None?
> 
> Alternatively, do you know any implementation of an hive sink in Flink?
> Other than the adaptation of the said HiveBolt in a RichSinkFunction?
> 
> Thanks for your attention,
>  Federico




Re: History Server

2017-09-25 Thread Nico Kruber
Hi Elias,
in theory, it could be integrated into a single web interface, but this was 
not done so far.
I guess the main reason for keeping it separate was probably to have a better 
separation of concerns as the history server is actually independent of the 
current JobManager execution and merely displays previous job results which 
may also come from different or previously existing JobManager instances which 
stored history data in its storage directory.

Chesnay (cc'd) may elaborate a bit more in case you'd like to change that and 
integrate the history server (interface) into the JobManager.


Nico

On Sunday, 24 September 2017 02:48:40 CEST Elias Levy wrote:
> I am curious, why is the History Server a separate process and Web UI
> instead of being part of the Web Dashboard within the Job Manager?




Re: high-availability.jobmanager.port vs jobmanager.rpc.port

2017-09-25 Thread Stephan Ewen
/cc Till for real this time ;-)

Hi!

I think that can probably be simplified in the FLIP-6 case:

  - All RPC is only between JM and TM and the port should be completely
random (optionally within a range). TM and JM discover each other via HA
(ZK) or the TM gets the JM RPC port as a parameter when the container is
started.
  (Parameter should be something like 'jobmanager.rpc.ports: 5-51000')

  - An exception is the standalone non-HA case, because there is no
service-discovery mechanism. That should probably be the a config key like
'standalone.jobmanager.rpc.port: 6123'

  - The client calls come via HTTP/REST and should have one specific port
that may optionally be discovered/redirected via YARN or the dispatchers.

/cc Till for your thoughts

Best,
Stephan


On Mon, Sep 25, 2017 at 3:31 PM, Nico Kruber  wrote:

> Hi Elias,
> indeed that looks strange but was introduced with FLINK-3172 [1] with an
> argument about using the same configuration key (as opposed to having two
> different keys as mentioned) starting at
> https://issues.apache.org/jira/browse/FLINK-3172?
> focusedCommentId=15091940#comment-15091940
>
>
> Nico
>
> [1] https://issues.apache.org/jira/browse/FLINK-3172
>
> On Sunday, 24 September 2017 03:04:51 CEST Elias Levy wrote:
> > I am wondering why HA mode there is a need for a separate config
> parameter
> > to set the JM RPC port (high-availability.jobmanager.port) and why this
> > parameter accepts a range, unlike jobmanager.rpc.port.
>
>


Re: high-availability.jobmanager.port vs jobmanager.rpc.port

2017-09-25 Thread Stephan Ewen
Hi!

I think that can probably be simplified in the FLIP-6 case:

  - All RPC is only between JM and TM and the port should be completely
random (optionally within a range). TM and JM discover each other via HA
(ZK) or the TM gets the JM RPC port as a parameter when the container is
started.
  (Parameter should be something like 'jobmanager.rpc.ports: 5-51000')

  - An exception is the standalone non-HA case, because there is no
service-discovery mechanism. That should probably be the a config key like
'standalone.jobmanager.rpc.port: 6123'

  - The client calls come via HTTP/REST and should have one specific port
that may optionally be discovered/redirected via YARN or the dispatchers.

/cc Till for your thoughts

Best,
Stephan


On Mon, Sep 25, 2017 at 3:31 PM, Nico Kruber  wrote:

> Hi Elias,
> indeed that looks strange but was introduced with FLINK-3172 [1] with an
> argument about using the same configuration key (as opposed to having two
> different keys as mentioned) starting at
> https://issues.apache.org/jira/browse/FLINK-3172?
> focusedCommentId=15091940#comment-15091940
>
>
> Nico
>
> [1] https://issues.apache.org/jira/browse/FLINK-3172
>
> On Sunday, 24 September 2017 03:04:51 CEST Elias Levy wrote:
> > I am wondering why HA mode there is a need for a separate config
> parameter
> > to set the JM RPC port (high-availability.jobmanager.port) and why this
> > parameter accepts a range, unlike jobmanager.rpc.port.
>
>


Re: high-availability.jobmanager.port vs jobmanager.rpc.port

2017-09-25 Thread Nico Kruber
Hi Elias,
indeed that looks strange but was introduced with FLINK-3172 [1] with an 
argument about using the same configuration key (as opposed to having two 
different keys as mentioned) starting at
https://issues.apache.org/jira/browse/FLINK-3172?
focusedCommentId=15091940#comment-15091940


Nico

[1] https://issues.apache.org/jira/browse/FLINK-3172

On Sunday, 24 September 2017 03:04:51 CEST Elias Levy wrote:
> I am wondering why HA mode there is a need for a separate config parameter
> to set the JM RPC port (high-availability.jobmanager.port) and why this
> parameter accepts a range, unlike jobmanager.rpc.port.



Rule expression for CEP library

2017-09-25 Thread Shailesh Jain
Hi,

Apart from the Java/Scala API for the CEP library, is there any other way
to express patterns/rules which can be run on flink engine?

Are there any plans on adding a DSL/Rule expression language for CEP
anytime soon? If not, any pointers on how it can be achieved now would be
really helpful.

Thanks,
Shailesh


RE: Flink kafka consumer that read from two partitions in local mode

2017-09-25 Thread Tzu-Li (Gordon) Tai
Hi Tovi,

Your code seems to be correct, and as Fabian described, you don’t need 
parallelism of 2 to read 2 partitions; a single parallel instance of the source 
can read multiple partitions.

I’m not sure what could have possibly gone wrong at the moment from a first 
look, so I may need to randomly ask you some questions:

Could you let me know which Flink version you are on?
Also, could you try searching in the log to see if you find consumer logs such 
as:
“Consumer subtask ... will start reading the following (numPartitions) 
partitions from ...: (partition info) "
You can try setting parallelism of the source to 1, and you should see that the 
subtask is reading 2 partitions.

From the metrics log it does seem like the consumer has picked up both 
partitions 0 and 1, but no records seem to be coming from partition 0. Have you 
perhaps tried using a non-Flink consumer, perhaps the simple console consumer, 
to read the topic, and see if records from both partitions are consumed 
properly?

Let me know, I’m sure we can figure this out somehow.

Cheers,
Gordon
On 24 September 2017 at 9:44:28 AM, Sofer, Tovi (tovi.so...@citi.com) wrote:

Thank you Fabian.

 

Fabian, Gordon, am I missing something in consumer setup?

Should I configure consumer in some way to subscribe to two partitions?

 

Thanks and regards,

Tovi

 

From: Fabian Hueske [mailto:fhue...@gmail.com]  
Sent: יום ג 19 ספטמבר 2017 22:58
To: Sofer, Tovi [ICG-IT]
Cc: user; Tzu-Li (Gordon) Tai
Subject: Re: Flink kafka consumer that read from two partitions in local mode

 

Hi Tovi,

your code looks OK to me. Maybe Gordon (in CC) has an idea what is going wrong.

Just a side note: you don't need to set the parallelism to 2 to read from two 
partitions. A single consumer instance reads can read from multiple partitions.

Best,

Fabian

 

2017-09-19 17:02 GMT+02:00 Sofer, Tovi :

Hi,

 

I am trying to setup FlinkKafkaConsumer which reads from two partitions in 
local mode, using  setParallelism=2.

The producer writes to two partition (as it is shown in metrics report).

But the consumer seems to read always from one partition only.

Am I missing something in partition configuration?

 

Code:

 

Producer setup:
Configuration localConfig = new Configuration();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(parallelism, localConfig);
env.setParallelism(2);
String kafkaPort = 
parameters.get(SimulatorConfig.ParamsEnum.KAFKA_PORT.fullName());
SingleOutputStreamOperator fixMsgSource = 
env.addSource(srcMsgProvider.getFixMsgSource(), 
TypeInformation.of(String.class)).name(sourceGenerationType.getValue());
fixMsgSource.addSink(new FlinkKafkaProducer010<>("localhost:"  + kafkaPort, 
TOPIC_NAME, new SimpleStringSchema()))
.name(“fix_topic”);
env.execute(“MsgSimulatorJob”);
 

 

Consumer setup:


String topicName = “fix”;
Configuration conf = new Configuration();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(2);
env.getConfig().setGlobalJobParameters(configParams); // make parameters 
available in the web interface
DeserializationSchema> deserializationSchema = new 
SimpleStringAndTimestampDeserializationSchema ();
FlinkKafkaConsumer010> kafkaConsumer = new 
FlinkKafkaConsumer010<>(topicName, deserializationSchema, 
kafkaParams.getProperties());
DataStream> fixMessagesStream = 
env.addSource(kafkaConsumer).name("fix_topic").setParallelism(2);
 

As you can see in output, only 1 consumer partition seems to be used:

Producer output:

2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: 
random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0

2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: 
fix_topic.1.numRecordsInPerSecond: 19836.0333

2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: 
fix_topic.0.numRecordsInPerSecond: 20337.9334

2017-09-19 14:40:45,819 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: 
random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0

Consumer output:

2017-09-19 14:40:45,116 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: 

Re: Broadcast Config through Connected Stream

2017-09-25 Thread Aljoscha Krettek
Hi,

I think this is a valid approach, you can even use "operator state" in your map 
function to make the broadcast config state stateful.

Another approach would be to use internal APIs to hack an operator that has a 
keyed stream on one input and a broadcast stream on the second input. You can 
see that approach in action in the Beam Flink Runner [1] but I would strongly 
recommend against doing that because it is using internal APIs and if the other 
approach works for you I would stay with that.

Best,
Aljoscha

[1] 
https://github.com/apache/beam/blob/be9fb29901cf4a1ae7b4a9d8e9f25f4ea78359fd/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L488
 


> On 15. Sep 2017, at 07:04, Navneeth Krishnan  wrote:
> 
> Hi,
> 
> Any suggestions on this could be achieved?
> 
> Thanks
> 
> On Thu, Sep 7, 2017 at 8:02 AM, Navneeth Krishnan  > wrote:
> Hi All,
> 
> Any suggestions on this would really help. 
> 
> Thanks.
> 
> On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan  > wrote:
> Hi All,
> 
> I looked into an earlier email about the topic broadcast config through 
> connected stream and I couldn't find the conclusion.
> 
> I can't do the below approach since I need the config to be published to all 
> operator instances but I need keyed state for external querying.
> 
> streamToBeConfigured.connect(configMessageStream)
>   .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
>   .flatMap(new FunctionWithConfigurableState())
>   .addSink(...);
> 
> One of the resolution I found in that mail chain was below. I can use this to 
> solve my issue but is this still the recommended approach?
> 
> stream1.connect(stream2)
> .map(new MergeStreamsMapFunction()) // Holds transient state of 
> the last ConfigMessage and maps Stream1's data to a Tuple2 ConfigMessage>
> .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow for 
> ValueStateDescriptors and semantically correct partitioning according to 
> business logic
> .flatMap(new StatefulFlatMapFunction()) // Save latest received 
> ConfigMessage-Value in ValueStateDescriptor here
> .addSink(...);
> 
> Thanks,
> Navneeth
> 
> 



Re: Unable to run flink job after adding a new dependency (cannot find Main-class)

2017-09-25 Thread Federico D'Ambrosio
As a little update, the pattern for the exclusion of those files in
sbt-assembly is the following:

assemblyMergeStrategy in assembly := {
  case PathList(ps @ _*) if ps.last.endsWith(".DSA") ||
ps.last.endsWith(".SF") || ps.last.endsWith(".RSA")  =>
MergeStrategy.discard
  //Other MergeStrategies
}

2017-09-25 11:48 GMT+02:00 Federico D'Ambrosio <
federico.dambro...@smartlab.ws>:

> Hi Urs,
>
> Thank you very much for your advice, I will look into excluding those
> files directly during the assembly.
>
> 2017-09-25 10:58 GMT+02:00 Urs Schoenenberger  com>:
>
>> Hi Federico,
>>
>> oh, I remember running into this problem some time ago. If I recall
>> correctly, this is not a flink issue, but an issue with technically
>> incorrect jars from dependencies which prevent the verification of the
>> manifest. I was using the maven-shade plugin back then and configured an
>> exclusion for these file types. I assume that sbt/sbt-assembly has a
>> similar option, this should be more stable than manually stripping the
>> jar.
>> Alternatively, you could try to find out which dependency puts the
>> .SF/etc files there and exclude this dependency altogether, it might be
>> a transitive lib dependency that comes with hadoop anyways, or simply
>> one that you don't need anyways.
>>
>> Best,
>> Urs
>>
>> On 25.09.2017 10:09, Federico D'Ambrosio wrote:
>> > Hi Urs,
>> >
>> > Yes the main class is set, just like you said.
>> >
>> > Still, I might have managed to get it working: during the assembly some
>> > .SF, .DSA and .RSA files are put inside the META-INF folder of the jar,
>> > possibly coming from some of the new dependencies in the deps tree.
>> > Apparently, this caused this weird issue. Using an appropriate pattern
>> for
>> > discarding the files during the assembly or removing them via zip -d
>> should
>> > be enough (I sure hope so, since this is some of the worst issues I've
>> come
>> > across).
>> >
>> >
>> > Federico D'Ambrosio
>> >
>> > Il 25 set 2017 9:51 AM, "Urs Schoenenberger" <
>> urs.schoenenber...@tngtech.com>
>> > ha scritto:
>> >
>> >> Hi Federico,
>> >>
>> >> just guessing, but are you explicitly setting the Main-Class manifest
>> >> attribute for the jar that you are building?
>> >>
>> >> Should be something like
>> >>
>> >> mainClass in (Compile, packageBin) :=
>> >> Some("org.yourorg.YourFlinkJobMainClass")
>> >>
>> >> Best,
>> >> Urs
>> >>
>> >>
>> >> On 23.09.2017 17:53, Federico D'Ambrosio wrote:
>> >>> Hello everyone,
>> >>>
>> >>> I'd like to submit to you this weird issue I'm having, hoping you
>> could
>> >>> help me.
>> >>> Premise: I'm using sbt 0.13.6 for building, scala 2.11.8 and flink
>> 1.3.2
>> >>> compiled from sources against hadoop 2.7.3.2.6.1.0-129 (HDP 2.6)
>> >>> So, I'm trying to implement an sink for Hive so I added the following
>> >>> dependency in my build.sbt:
>> >>>
>> >>> "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
>> >>> "1.2.1000.2.6.1.0-129"
>> >>>
>> >>> in order to use hive streaming capabilities.
>> >>>
>> >>> After importing this dependency, not even using it, if I try to flink
>> run
>> >>> the job I get
>> >>>
>> >>> org.apache.flink.client.program.ProgramInvocationException: The
>> >> program's
>> >>> entry point class 'package.MainObj' was not found in the jar file.
>> >>>
>> >>> If I remove the dependency, everything goes back to normal.
>> >>> What is weird is that if I try to use sbt run in order to run job, *it
>> >> does
>> >>> find the Main class* and obviously crash because of the missing flink
>> >> core
>> >>> dependencies (AbstractStateBackend missing and whatnot).
>> >>>
>> >>> Here are the complete dependencies of the project:
>> >>>
>> >>> "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
>> >>> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion %
>> >> "provided",
>> >>> "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>> >>> "org.apache.flink" %% "flink-cep-scala" % flinkVersion,
>> >>> "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
>> >>> "1.2.1000.2.6.1.0-129",
>> >>> "org.joda" % "joda-convert" % "1.8.3",
>> >>> "com.typesafe.play" %% "play-json" % "2.6.2",
>> >>> "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "2.0.2",
>> >>> "org.scalactic" %% "scalactic" % "3.0.1",
>> >>> "org.scalatest" %% "scalatest" % "3.0.1" % "test",
>> >>> "de.javakaffee" % "kryo-serializers" % "0.42"
>> >>>
>> >>> Could it be an issue of dependencies conflicts between mongo-hadoop
>> and
>> >>> hive hadoop versions (respectively 2.7.1 and  2.7.3.2.6.1.0-129, even
>> >>> though no issue between mongodb-hadoop and flink)? I'm even starting
>> to
>> >>> think that Flink cannot handle that well big jars (before the new
>> >>> dependency it was 44M, afterwards it became 115M) when it comes to
>> >>> classpath loading?
>> >>>
>> >>> Any help would be really appreciated,
>> >>> Kind regards,
>> >>> Federico
>> >>>
>> >>>
>> >>>
>> >>> Hello everyone,
>> >>>
>> >>> I'd 

Re: Unable to run flink job after adding a new dependency (cannot find Main-class)

2017-09-25 Thread Federico D'Ambrosio
Hi Urs,

Thank you very much for your advice, I will look into excluding those files
directly during the assembly.

2017-09-25 10:58 GMT+02:00 Urs Schoenenberger <
urs.schoenenber...@tngtech.com>:

> Hi Federico,
>
> oh, I remember running into this problem some time ago. If I recall
> correctly, this is not a flink issue, but an issue with technically
> incorrect jars from dependencies which prevent the verification of the
> manifest. I was using the maven-shade plugin back then and configured an
> exclusion for these file types. I assume that sbt/sbt-assembly has a
> similar option, this should be more stable than manually stripping the jar.
> Alternatively, you could try to find out which dependency puts the
> .SF/etc files there and exclude this dependency altogether, it might be
> a transitive lib dependency that comes with hadoop anyways, or simply
> one that you don't need anyways.
>
> Best,
> Urs
>
> On 25.09.2017 10:09, Federico D'Ambrosio wrote:
> > Hi Urs,
> >
> > Yes the main class is set, just like you said.
> >
> > Still, I might have managed to get it working: during the assembly some
> > .SF, .DSA and .RSA files are put inside the META-INF folder of the jar,
> > possibly coming from some of the new dependencies in the deps tree.
> > Apparently, this caused this weird issue. Using an appropriate pattern
> for
> > discarding the files during the assembly or removing them via zip -d
> should
> > be enough (I sure hope so, since this is some of the worst issues I've
> come
> > across).
> >
> >
> > Federico D'Ambrosio
> >
> > Il 25 set 2017 9:51 AM, "Urs Schoenenberger" <
> urs.schoenenber...@tngtech.com>
> > ha scritto:
> >
> >> Hi Federico,
> >>
> >> just guessing, but are you explicitly setting the Main-Class manifest
> >> attribute for the jar that you are building?
> >>
> >> Should be something like
> >>
> >> mainClass in (Compile, packageBin) :=
> >> Some("org.yourorg.YourFlinkJobMainClass")
> >>
> >> Best,
> >> Urs
> >>
> >>
> >> On 23.09.2017 17:53, Federico D'Ambrosio wrote:
> >>> Hello everyone,
> >>>
> >>> I'd like to submit to you this weird issue I'm having, hoping you could
> >>> help me.
> >>> Premise: I'm using sbt 0.13.6 for building, scala 2.11.8 and flink
> 1.3.2
> >>> compiled from sources against hadoop 2.7.3.2.6.1.0-129 (HDP 2.6)
> >>> So, I'm trying to implement an sink for Hive so I added the following
> >>> dependency in my build.sbt:
> >>>
> >>> "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> >>> "1.2.1000.2.6.1.0-129"
> >>>
> >>> in order to use hive streaming capabilities.
> >>>
> >>> After importing this dependency, not even using it, if I try to flink
> run
> >>> the job I get
> >>>
> >>> org.apache.flink.client.program.ProgramInvocationException: The
> >> program's
> >>> entry point class 'package.MainObj' was not found in the jar file.
> >>>
> >>> If I remove the dependency, everything goes back to normal.
> >>> What is weird is that if I try to use sbt run in order to run job, *it
> >> does
> >>> find the Main class* and obviously crash because of the missing flink
> >> core
> >>> dependencies (AbstractStateBackend missing and whatnot).
> >>>
> >>> Here are the complete dependencies of the project:
> >>>
> >>> "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
> >>> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion %
> >> "provided",
> >>> "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
> >>> "org.apache.flink" %% "flink-cep-scala" % flinkVersion,
> >>> "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> >>> "1.2.1000.2.6.1.0-129",
> >>> "org.joda" % "joda-convert" % "1.8.3",
> >>> "com.typesafe.play" %% "play-json" % "2.6.2",
> >>> "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "2.0.2",
> >>> "org.scalactic" %% "scalactic" % "3.0.1",
> >>> "org.scalatest" %% "scalatest" % "3.0.1" % "test",
> >>> "de.javakaffee" % "kryo-serializers" % "0.42"
> >>>
> >>> Could it be an issue of dependencies conflicts between mongo-hadoop and
> >>> hive hadoop versions (respectively 2.7.1 and  2.7.3.2.6.1.0-129, even
> >>> though no issue between mongodb-hadoop and flink)? I'm even starting to
> >>> think that Flink cannot handle that well big jars (before the new
> >>> dependency it was 44M, afterwards it became 115M) when it comes to
> >>> classpath loading?
> >>>
> >>> Any help would be really appreciated,
> >>> Kind regards,
> >>> Federico
> >>>
> >>>
> >>>
> >>> Hello everyone,
> >>>
> >>> I'd like to submit to you this weird issue I'm having, hoping you could
> >>> help me.
> >>> Premise: I'm using sbt 0.13.6 for building, scala 2.11.8 and flink
> 1.3.2
> >>> compiled from sources against hadoop 2.7.3.2.6.1.0-129 (HDP 2.6)
> >>> So, I'm trying to implement an sink for Hive so I added the following
> >>> dependency in my build.sbt:
> >>>
> >>> "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> >>> "1.2.1000.2.6.1.0-129"
> >>>
> >>> in order to use hive streaming capabilities.
> >>>
> >>> After importing 

Re: StreamCorruptedException

2017-09-25 Thread Tzu-Li (Gordon) Tai
I talked a bit with Kostas on what may be happening here.

It could be that your patterns are not closing, which depends on the pattern 
construction of your CEP job.
Could you perhaps provide an overview / code snippet of what your CEP job is 
doing?

Looping Kostas (in CC) also to this thread as he may have a better idea what is 
happening here.

Cheers,
Gordon
On 22 September 2017 at 4:09:07 PM, Sridhar Chellappa (flinken...@gmail.com) 
wrote:

Thanks for the reply. Well, tracing back to the root cause, I see the following:

1. At the Job manager, the Checkpoint times are getting worse :

Jobmanager :

Checkpoint times are getting worse progressively.

2017-09-16 05:05:50,813 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1505538350809
2017-09-16 05:05:51,396 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1 (11101233 bytes in 586 ms).
2017-09-16 05:07:30,809 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 2 @ 1505538450809
2017-09-16 05:07:31,657 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 2 (18070955 bytes in 583 ms).

  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
2017-09-16 07:32:58,117 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 89 (246125113 bytes in 27194 ms).
2017-09-16 07:34:10,809 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 90 @ 1505547250809
2017-09-16 07:34:44,932 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 90 (248272325 bytes in 34012 ms).
2017-09-16 07:35:50,809 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 91 @ 1505547350809
2017-09-16 07:36:37,058 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 91 (250348812 bytes in 46136 ms).
2017-09-16 07:37:30,809 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 92 @ 1505547450809
2017-09-16 07:38:18,076 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 92 (252399724 bytes in 47152 ms).
2017-09-16 07:39:10,809 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 93 @ 1505547550809
2017-09-16 07:40:13,494 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 93 (254374636 bytes in 62573 ms).
2017-09-16 07:40:50,809 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 94 @ 1505547650809
2017-09-16 07:42:42,850 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 94 (256386533 bytes in 111898 ms).
2017-09-16 07:42:42,850 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 95 @ 1505547762850
2017-09-16 07:46:06,241 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 95 (258441766 bytes in 203268 ms).
2017-09-16 07:46:06,241 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 96 @ 1505547966241
2017-09-16 07:48:42,069 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph    - 
KeyedCEPPatternOperator -> Map (1/4) (ff835faa9eb9182ed2f2230a1e5cc56d) 
switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 96 
for operator KeyedCEPPatternOperator -> Map (1/4).}
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 96 for 
operator KeyedCEPPatternOperator -> Map (1/4).
    ... 6 more
Caused by: 

Re: Unable to run flink job after adding a new dependency (cannot find Main-class)

2017-09-25 Thread Urs Schoenenberger
Hi Federico,

oh, I remember running into this problem some time ago. If I recall
correctly, this is not a flink issue, but an issue with technically
incorrect jars from dependencies which prevent the verification of the
manifest. I was using the maven-shade plugin back then and configured an
exclusion for these file types. I assume that sbt/sbt-assembly has a
similar option, this should be more stable than manually stripping the jar.
Alternatively, you could try to find out which dependency puts the
.SF/etc files there and exclude this dependency altogether, it might be
a transitive lib dependency that comes with hadoop anyways, or simply
one that you don't need anyways.

Best,
Urs

On 25.09.2017 10:09, Federico D'Ambrosio wrote:
> Hi Urs,
> 
> Yes the main class is set, just like you said.
> 
> Still, I might have managed to get it working: during the assembly some
> .SF, .DSA and .RSA files are put inside the META-INF folder of the jar,
> possibly coming from some of the new dependencies in the deps tree.
> Apparently, this caused this weird issue. Using an appropriate pattern for
> discarding the files during the assembly or removing them via zip -d should
> be enough (I sure hope so, since this is some of the worst issues I've come
> across).
> 
> 
> Federico D'Ambrosio
> 
> Il 25 set 2017 9:51 AM, "Urs Schoenenberger" 
> ha scritto:
> 
>> Hi Federico,
>>
>> just guessing, but are you explicitly setting the Main-Class manifest
>> attribute for the jar that you are building?
>>
>> Should be something like
>>
>> mainClass in (Compile, packageBin) :=
>> Some("org.yourorg.YourFlinkJobMainClass")
>>
>> Best,
>> Urs
>>
>>
>> On 23.09.2017 17:53, Federico D'Ambrosio wrote:
>>> Hello everyone,
>>>
>>> I'd like to submit to you this weird issue I'm having, hoping you could
>>> help me.
>>> Premise: I'm using sbt 0.13.6 for building, scala 2.11.8 and flink 1.3.2
>>> compiled from sources against hadoop 2.7.3.2.6.1.0-129 (HDP 2.6)
>>> So, I'm trying to implement an sink for Hive so I added the following
>>> dependency in my build.sbt:
>>>
>>> "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
>>> "1.2.1000.2.6.1.0-129"
>>>
>>> in order to use hive streaming capabilities.
>>>
>>> After importing this dependency, not even using it, if I try to flink run
>>> the job I get
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The
>> program's
>>> entry point class 'package.MainObj' was not found in the jar file.
>>>
>>> If I remove the dependency, everything goes back to normal.
>>> What is weird is that if I try to use sbt run in order to run job, *it
>> does
>>> find the Main class* and obviously crash because of the missing flink
>> core
>>> dependencies (AbstractStateBackend missing and whatnot).
>>>
>>> Here are the complete dependencies of the project:
>>>
>>> "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
>>> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion %
>> "provided",
>>> "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>>> "org.apache.flink" %% "flink-cep-scala" % flinkVersion,
>>> "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
>>> "1.2.1000.2.6.1.0-129",
>>> "org.joda" % "joda-convert" % "1.8.3",
>>> "com.typesafe.play" %% "play-json" % "2.6.2",
>>> "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "2.0.2",
>>> "org.scalactic" %% "scalactic" % "3.0.1",
>>> "org.scalatest" %% "scalatest" % "3.0.1" % "test",
>>> "de.javakaffee" % "kryo-serializers" % "0.42"
>>>
>>> Could it be an issue of dependencies conflicts between mongo-hadoop and
>>> hive hadoop versions (respectively 2.7.1 and  2.7.3.2.6.1.0-129, even
>>> though no issue between mongodb-hadoop and flink)? I'm even starting to
>>> think that Flink cannot handle that well big jars (before the new
>>> dependency it was 44M, afterwards it became 115M) when it comes to
>>> classpath loading?
>>>
>>> Any help would be really appreciated,
>>> Kind regards,
>>> Federico
>>>
>>>
>>>
>>> Hello everyone,
>>>
>>> I'd like to submit to you this weird issue I'm having, hoping you could
>>> help me.
>>> Premise: I'm using sbt 0.13.6 for building, scala 2.11.8 and flink 1.3.2
>>> compiled from sources against hadoop 2.7.3.2.6.1.0-129 (HDP 2.6)
>>> So, I'm trying to implement an sink for Hive so I added the following
>>> dependency in my build.sbt:
>>>
>>> "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
>>> "1.2.1000.2.6.1.0-129"
>>>
>>> in order to use hive streaming capabilities.
>>>
>>> After importing this dependency, not even using it, if I try to flink
>>> run the job I get
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The
>>> program's entry point class 'package.MainObj' was not found in the jar
>> file.
>>>
>>> If I remove the dependency, everything goes back to normal.
>>> What is weird is that if I try to use sbt run in order to run job, *it
>>> does find the Main class* and obviously crash because of the missing

Re: Exception : Table of atomic type can only have a single field, when transferring DataStream to Table ?

2017-09-25 Thread Timo Walther

Hi,

I also replied to your Stackoverflow question. I think the problem is 
that BillCount has the wrong type and is therefore treated as one single 
black box.


Haohui's suggestion will no work because the row type needs information 
about the fields.  The easiest thing is to figure out why BillCount has 
the wrong type. Make sure that it is defined in a statically.


What type is Record? Maybe you don't need the additional MapFunction but 
can use the Table API for mapping.


Regards,
Timo


Am 9/25/17 um 9:29 AM schrieb Haohui Mai:

Hi,

I think instead of generating DataStream[BillCount], the correct way 
is to generate DataStream[Row], that is,


kafkaInputStream.map(value -> Row.of(value.getLogis_id, 
value.getProvince_id, value.getCity_id, 
value.getOrder_require_varieties, value.getOrder_rec_amount, 
value.getStore_rec_date.getTime)


That should work.

Regards,
Haohui



On Sun, Sep 24, 2017 at 6:40 PM laney0...@163.com 
 > wrote:


Hi,
     I‘m confused about a problem, occuring a exception
"org.apache.flink.table.api.TableException: Table of atomic type can only have a 
single field."

 Both *BillCount *and***Record *are class object*.*Following is code.
  case  class
*BillCount*(logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, 
orderRecAmount: Double, orderRecDate: Long)
val kafkaInputStream: DataStream[*Record*] = env.addSource(source)
  //source is FlinkKafkaConsumer010 source
val tbDataStream : DataStream[*BillCount*] = kafkaInputStream.map(
 new MapFunction[Record, BillCount] {
override def map(value: *Record*) = {
*BillCount*(value.getLogis_id, value.getProvince_id, value.getCity_id,
value.getOrder_require_varieties, value.getOrder_rec_amount, 
value.getStore_rec_date.getTime)
}
  })
 val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 
'orderRequVari, 'orderRecAmount, 'orderRecDate)
      // occur error here

    Error :
Exception in thread "main" org.apache.flink.table.api.TableException: Table 
of atomic type can only have a single field.
at 
org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
at 
org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
at 
org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
at 
org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
at 
org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)


  Thanks.



laney0...@163.com 


【网易自营|30天无忧退货】仅售同款价1/4!MUJI制造商“2017秋冬舒适家居拖鞋系列”限时仅34.9元>>







Re: Unable to run flink job after adding a new dependency (cannot find Main-class)

2017-09-25 Thread Federico D'Ambrosio
Hi Urs,

Yes the main class is set, just like you said.

Still, I might have managed to get it working: during the assembly some
.SF, .DSA and .RSA files are put inside the META-INF folder of the jar,
possibly coming from some of the new dependencies in the deps tree.
Apparently, this caused this weird issue. Using an appropriate pattern for
discarding the files during the assembly or removing them via zip -d should
be enough (I sure hope so, since this is some of the worst issues I've come
across).


Federico D'Ambrosio

Il 25 set 2017 9:51 AM, "Urs Schoenenberger" 
ha scritto:

> Hi Federico,
>
> just guessing, but are you explicitly setting the Main-Class manifest
> attribute for the jar that you are building?
>
> Should be something like
>
> mainClass in (Compile, packageBin) :=
> Some("org.yourorg.YourFlinkJobMainClass")
>
> Best,
> Urs
>
>
> On 23.09.2017 17:53, Federico D'Ambrosio wrote:
> > Hello everyone,
> >
> > I'd like to submit to you this weird issue I'm having, hoping you could
> > help me.
> > Premise: I'm using sbt 0.13.6 for building, scala 2.11.8 and flink 1.3.2
> > compiled from sources against hadoop 2.7.3.2.6.1.0-129 (HDP 2.6)
> > So, I'm trying to implement an sink for Hive so I added the following
> > dependency in my build.sbt:
> >
> > "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> > "1.2.1000.2.6.1.0-129"
> >
> > in order to use hive streaming capabilities.
> >
> > After importing this dependency, not even using it, if I try to flink run
> > the job I get
> >
> > org.apache.flink.client.program.ProgramInvocationException: The
> program's
> > entry point class 'package.MainObj' was not found in the jar file.
> >
> > If I remove the dependency, everything goes back to normal.
> > What is weird is that if I try to use sbt run in order to run job, *it
> does
> > find the Main class* and obviously crash because of the missing flink
> core
> > dependencies (AbstractStateBackend missing and whatnot).
> >
> > Here are the complete dependencies of the project:
> >
> > "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
> > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion %
> "provided",
> > "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
> > "org.apache.flink" %% "flink-cep-scala" % flinkVersion,
> > "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> > "1.2.1000.2.6.1.0-129",
> > "org.joda" % "joda-convert" % "1.8.3",
> > "com.typesafe.play" %% "play-json" % "2.6.2",
> > "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "2.0.2",
> > "org.scalactic" %% "scalactic" % "3.0.1",
> > "org.scalatest" %% "scalatest" % "3.0.1" % "test",
> > "de.javakaffee" % "kryo-serializers" % "0.42"
> >
> > Could it be an issue of dependencies conflicts between mongo-hadoop and
> > hive hadoop versions (respectively 2.7.1 and  2.7.3.2.6.1.0-129, even
> > though no issue between mongodb-hadoop and flink)? I'm even starting to
> > think that Flink cannot handle that well big jars (before the new
> > dependency it was 44M, afterwards it became 115M) when it comes to
> > classpath loading?
> >
> > Any help would be really appreciated,
> > Kind regards,
> > Federico
> >
> >
> >
> > Hello everyone,
> >
> > I'd like to submit to you this weird issue I'm having, hoping you could
> > help me.
> > Premise: I'm using sbt 0.13.6 for building, scala 2.11.8 and flink 1.3.2
> > compiled from sources against hadoop 2.7.3.2.6.1.0-129 (HDP 2.6)
> > So, I'm trying to implement an sink for Hive so I added the following
> > dependency in my build.sbt:
> >
> > "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> > "1.2.1000.2.6.1.0-129"
> >
> > in order to use hive streaming capabilities.
> >
> > After importing this dependency, not even using it, if I try to flink
> > run the job I get
> >
> > org.apache.flink.client.program.ProgramInvocationException: The
> > program's entry point class 'package.MainObj' was not found in the jar
> file.
> >
> > If I remove the dependency, everything goes back to normal.
> > What is weird is that if I try to use sbt run in order to run job, *it
> > does find the Main class* and obviously crash because of the missing
> > flink core dependencies (AbstractStateBackend missing and whatnot).
> >
> > Here are the complete dependencies of the project:
> >
> > "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
> > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion %
> "provided",
> > "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
> > "org.apache.flink" %% "flink-cep-scala" % flinkVersion,
> > "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> > "1.2.1000.2.6.1.0-129",
> > "org.joda" % "joda-convert" % "1.8.3",
> > "com.typesafe.play" %% "play-json" % "2.6.2",
> > "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "2.0.2",
> > "org.scalactic" %% "scalactic" % "3.0.1",
> > "org.scalatest" %% "scalatest" % "3.0.1" % "test",
> > "de.javakaffee" % 

Re: akka timeout

2017-09-25 Thread Till Rohrmann
Quick question Steven. Where did you find the documentation concerning that
the death watch interval is linke to the akka ask timeout? It was included
in the past, but I couldn't find it anymore.

Cheers,
Till

On Mon, Sep 25, 2017 at 9:47 AM, Till Rohrmann  wrote:

> Great to hear that you could figure things out Steven.
>
> You are right. The death watch is no longer linked to the akka ask
> timeout, because of FLINK-6495. Thanks for the feedback. I will correct the
> documentation.
>
> Cheers,
> Till
>
> On Sat, Sep 23, 2017 at 10:24 AM, Steven Wu  wrote:
>
>> just to close the thread. akka death watch was triggered by high GC
>> pause, which is caused by memory leak in our code during Flink job restart.
>>
>> noted that akka.ask.timeout wasn't related to akka death watch, which
>> Flink has documented and linked.
>>
>> On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu  wrote:
>>
>>> this is a stateless job. so we don't use RocksDB.
>>>
>>> yeah. network can also be a possibility. will keep it in the radar.
>>> unfortunately, our metrics system don't have the tcp metrics when running
>>> inside containers.
>>>
>>> On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger 
>>> wrote:
>>>
 Hi,
 are you using the RocksDB state backend already?
 Maybe writing the state to disk would actually reduce the pressure on
 the GC (but of course it'll also reduce throughput a bit).

 Are there any known issues with the network? Maybe the network bursts
 on restart cause the timeouts?


 On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu 
 wrote:

> Bowen,
>
> Heap size is ~50G. CPU was actually pretty low (like <20%) when high
> GC pause and akka timeout was happening. So maybe memory allocation and GC
> wasn't really an issue. I also recently learned that JVM can pause for
> writing to GC log for disk I/O. that is another lead I am pursuing.
>
> Thanks,
> Steven
>
> On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li 
> wrote:
>
>> Hi Steven,
>> Yes, GC is a big overhead, it may cause your CPU utilization to
>> reach 100%, and every process stopped working. We ran into this a while 
>> too.
>>
>> How much memory did you assign to TaskManager? How much the your
>> CPU utilization when your taskmanager is considered 'killed'?
>>
>> Bowen
>>
>>
>>
>> On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu 
>> wrote:
>>
>>> Till,
>>>
>>> Once our job was restarted for some reason (e.g. taskmangaer
>>> container got killed), it can stuck in continuous restart loop for 
>>> hours.
>>> Right now, I suspect it is caused by GC pause during restart, our job 
>>> has
>>> very high memory allocation in steady state. High GC pause then caused 
>>> akka
>>> timeout, which then caused jobmanager to think taksmanager containers 
>>> are
>>> unhealthy/dead and kill them. And the cycle repeats...
>>>
>>> But I hasn't been able to prove or disprove it yet. When I was
>>> asking the question, I was still sifting through metrics and error logs.
>>>
>>> Thanks,
>>> Steven
>>>
>>>
>>> On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <
>>> till.rohrm...@gmail.com> wrote:
>>>
 Hi Steven,

 quick correction for Flink 1.2. Indeed the MetricFetcher does not
 pick up the right timeout value from the configuration. Instead it 
 uses a
 hardcoded 10s timeout. This has only been changed recently and is 
 already
 committed in the master. So with the next release 1.4 it will properly 
 pick
 up the right timeout settings.

 Just out of curiosity, what's the instability issue you're
 observing?

 Cheers,
 Till

 On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu 
 wrote:

> Till/Chesnay, thanks for the answers. Look like this is a
> result/symptom of underline stability issue that I am trying to track 
> down.
>
> It is Flink 1.2.
>
> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <
> ches...@apache.org> wrote:
>
>> The MetricFetcher always use the default akka timeout value.
>>
>>
>> On 18.08.2017 09:07, Till Rohrmann wrote:
>>
>> Hi Steven,
>>
>> I thought that the MetricFetcher picks up the right timeout from
>> the configuration. Which version of Flink are you using?
>>
>> The timeout is not a critical problem for the job health.
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu 

Re: Unable to run flink job after adding a new dependency (cannot find Main-class)

2017-09-25 Thread Urs Schoenenberger
Hi Federico,

just guessing, but are you explicitly setting the Main-Class manifest
attribute for the jar that you are building?

Should be something like

mainClass in (Compile, packageBin) :=
Some("org.yourorg.YourFlinkJobMainClass")

Best,
Urs


On 23.09.2017 17:53, Federico D'Ambrosio wrote:
> Hello everyone,
> 
> I'd like to submit to you this weird issue I'm having, hoping you could
> help me.
> Premise: I'm using sbt 0.13.6 for building, scala 2.11.8 and flink 1.3.2
> compiled from sources against hadoop 2.7.3.2.6.1.0-129 (HDP 2.6)
> So, I'm trying to implement an sink for Hive so I added the following
> dependency in my build.sbt:
> 
> "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> "1.2.1000.2.6.1.0-129"
> 
> in order to use hive streaming capabilities.
> 
> After importing this dependency, not even using it, if I try to flink run
> the job I get
> 
> org.apache.flink.client.program.ProgramInvocationException: The program's
> entry point class 'package.MainObj' was not found in the jar file.
> 
> If I remove the dependency, everything goes back to normal.
> What is weird is that if I try to use sbt run in order to run job, *it does
> find the Main class* and obviously crash because of the missing flink core
> dependencies (AbstractStateBackend missing and whatnot).
> 
> Here are the complete dependencies of the project:
> 
> "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
> "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
> "org.apache.flink" %% "flink-cep-scala" % flinkVersion,
> "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> "1.2.1000.2.6.1.0-129",
> "org.joda" % "joda-convert" % "1.8.3",
> "com.typesafe.play" %% "play-json" % "2.6.2",
> "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "2.0.2",
> "org.scalactic" %% "scalactic" % "3.0.1",
> "org.scalatest" %% "scalatest" % "3.0.1" % "test",
> "de.javakaffee" % "kryo-serializers" % "0.42"
> 
> Could it be an issue of dependencies conflicts between mongo-hadoop and
> hive hadoop versions (respectively 2.7.1 and  2.7.3.2.6.1.0-129, even
> though no issue between mongodb-hadoop and flink)? I'm even starting to
> think that Flink cannot handle that well big jars (before the new
> dependency it was 44M, afterwards it became 115M) when it comes to
> classpath loading?
> 
> Any help would be really appreciated,
> Kind regards,
> Federico
> 
> 
> 
> Hello everyone,
> 
> I'd like to submit to you this weird issue I'm having, hoping you could
> help me.
> Premise: I'm using sbt 0.13.6 for building, scala 2.11.8 and flink 1.3.2
> compiled from sources against hadoop 2.7.3.2.6.1.0-129 (HDP 2.6)
> So, I'm trying to implement an sink for Hive so I added the following
> dependency in my build.sbt:
> 
> "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> "1.2.1000.2.6.1.0-129"
> 
> in order to use hive streaming capabilities.
> 
> After importing this dependency, not even using it, if I try to flink
> run the job I get
> 
> org.apache.flink.client.program.ProgramInvocationException: The
> program's entry point class 'package.MainObj' was not found in the jar file.
> 
> If I remove the dependency, everything goes back to normal.
> What is weird is that if I try to use sbt run in order to run job, *it
> does find the Main class* and obviously crash because of the missing
> flink core dependencies (AbstractStateBackend missing and whatnot).
> 
> Here are the complete dependencies of the project:
> 
> "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
> "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
> "org.apache.flink" %% "flink-cep-scala" % flinkVersion,
> "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> "1.2.1000.2.6.1.0-129",
> "org.joda" % "joda-convert" % "1.8.3",
> "com.typesafe.play" %% "play-json" % "2.6.2",
> "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "2.0.2",
> "org.scalactic" %% "scalactic" % "3.0.1",
> "org.scalatest" %% "scalatest" % "3.0.1" % "test",
> "de.javakaffee" % "kryo-serializers" % "0.42"
> 
> Could it be an issue of dependencies conflicts between mongo-hadoop and
> hive hadoop versions (respectively 2.7.1 and  2.7.3.2.6.1.0-129, even
> though no issue between mongodb-hadoop and flink)? I'm even starting to
> think that Flink cannot handle that well big jars (before the new
> dependency it was 44M, afterwards it became 115M) when it comes to
> classpath loading?
> 
> Any help would be really appreciated,
> Kind regards,
> Federico

-- 
Urs Schönenberger - urs.schoenenber...@tngtech.com

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: akka timeout

2017-09-25 Thread Till Rohrmann
Great to hear that you could figure things out Steven.

You are right. The death watch is no longer linked to the akka ask timeout,
because of FLINK-6495. Thanks for the feedback. I will correct the
documentation.

Cheers,
Till

On Sat, Sep 23, 2017 at 10:24 AM, Steven Wu  wrote:

> just to close the thread. akka death watch was triggered by high GC pause,
> which is caused by memory leak in our code during Flink job restart.
>
> noted that akka.ask.timeout wasn't related to akka death watch, which
> Flink has documented and linked.
>
> On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu  wrote:
>
>> this is a stateless job. so we don't use RocksDB.
>>
>> yeah. network can also be a possibility. will keep it in the radar.
>> unfortunately, our metrics system don't have the tcp metrics when running
>> inside containers.
>>
>> On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger 
>> wrote:
>>
>>> Hi,
>>> are you using the RocksDB state backend already?
>>> Maybe writing the state to disk would actually reduce the pressure on
>>> the GC (but of course it'll also reduce throughput a bit).
>>>
>>> Are there any known issues with the network? Maybe the network bursts on
>>> restart cause the timeouts?
>>>
>>>
>>> On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu  wrote:
>>>
 Bowen,

 Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC
 pause and akka timeout was happening. So maybe memory allocation and GC
 wasn't really an issue. I also recently learned that JVM can pause for
 writing to GC log for disk I/O. that is another lead I am pursuing.

 Thanks,
 Steven

 On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li 
 wrote:

> Hi Steven,
> Yes, GC is a big overhead, it may cause your CPU utilization to
> reach 100%, and every process stopped working. We ran into this a while 
> too.
>
> How much memory did you assign to TaskManager? How much the your
> CPU utilization when your taskmanager is considered 'killed'?
>
> Bowen
>
>
>
> On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu 
> wrote:
>
>> Till,
>>
>> Once our job was restarted for some reason (e.g. taskmangaer
>> container got killed), it can stuck in continuous restart loop for hours.
>> Right now, I suspect it is caused by GC pause during restart, our job has
>> very high memory allocation in steady state. High GC pause then caused 
>> akka
>> timeout, which then caused jobmanager to think taksmanager containers are
>> unhealthy/dead and kill them. And the cycle repeats...
>>
>> But I hasn't been able to prove or disprove it yet. When I was asking
>> the question, I was still sifting through metrics and error logs.
>>
>> Thanks,
>> Steven
>>
>>
>> On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <
>> till.rohrm...@gmail.com> wrote:
>>
>>> Hi Steven,
>>>
>>> quick correction for Flink 1.2. Indeed the MetricFetcher does not
>>> pick up the right timeout value from the configuration. Instead it uses 
>>> a
>>> hardcoded 10s timeout. This has only been changed recently and is 
>>> already
>>> committed in the master. So with the next release 1.4 it will properly 
>>> pick
>>> up the right timeout settings.
>>>
>>> Just out of curiosity, what's the instability issue you're observing?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu 
>>> wrote:
>>>
 Till/Chesnay, thanks for the answers. Look like this is a
 result/symptom of underline stability issue that I am trying to track 
 down.

 It is Flink 1.2.

 On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <
 ches...@apache.org> wrote:

> The MetricFetcher always use the default akka timeout value.
>
>
> On 18.08.2017 09:07, Till Rohrmann wrote:
>
> Hi Steven,
>
> I thought that the MetricFetcher picks up the right timeout from
> the configuration. Which version of Flink are you using?
>
> The timeout is not a critical problem for the job health.
>
> Cheers,
> Till
>
> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu 
> wrote:
>
>>
>> We have set akka.ask.timeout to 60 s in yaml file. I also
>> confirmed the setting in Flink UI. But I saw akka timeout of 10 s for
>> metric query service. two questions
>> 1) why doesn't metric query use the 60 s value configured in yaml
>> file? does it always use default 10 s value?
>> 2) could this cause heartbeat failure between task manager and
>> job 

Re: Exception : Table of atomic type can only have a single field, when transferring DataStream to Table ?

2017-09-25 Thread Haohui Mai
Hi,

I think instead of generating DataStream[BillCount], the correct way is to
generate DataStream[Row], that is,

kafkaInputStream.map(value -> Row.of(value.getLogis_id,
value.getProvince_id, value.getCity_id, value.getOrder_require_varieties,
value.getOrder_rec_amount, value.getStore_rec_date.getTime)

That should work.

Regards,
Haohui



On Sun, Sep 24, 2017 at 6:40 PM laney0...@163.com  wrote:

> Hi,
>  I‘m confused about a problem, occuring a exception "
> org.apache.flink.table.api.TableException: Table of atomic type can only have 
> a single field.
> "
>  Both *BillCount *and *Record *are class object*.*  Following is code.
>
>case  class *BillCount*
> (logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, 
> orderRecAmount: Double, orderRecDate: Long)
>val kafkaInputStream: DataStream[*Record*] = env.addSource(source)
>   //source is FlinkKafkaConsumer010 source
>val tbDataStream : DataStream[*BillCount*] = kafkaInputStream.map(
>   new MapFunction[Record, BillCount] {
> override def map(value: *Record*) = {
>   *BillCount*
> (value.getLogis_id, value.getProvince_id, value.getCity_id,
>
> value.getOrder_require_varieties, value.getOrder_rec_amount, 
> value.getStore_rec_date.getTime)
> }
>   })
>
>  val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 
> 'orderRequVari, 'orderRecAmount, 'orderRecDate)
>   // occur error here
>
>
> Error :
>
> Exception in thread "main" org.apache.flink.table.api.TableException: Table 
> of atomic type can only have a single field.
>
> at 
> org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
>
> at 
> org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
>
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
>
> at 
> org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
>
> at 
> org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
>
> at 
> org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
>
> at 
> org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)
>
>
>   Thanks.
>
>
> --
> laney0...@163.com
>
>
> 【网易自营|30天无忧退货】仅售同款价1/4!MUJI制造商“2017秋冬舒适家居拖鞋系列”限时仅34.9元>>
> 
>
>